mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2025-02-05 12:10:03 +01:00
Although some features are mixed together in this PR, this PR is not that large, and these features are all related. Actually there are more than 70 lines are for a toy "test queue", so this PR is quite simple. Major features: 1. Allow site admin to clear a queue (remove all items in a queue) * Because there is no transaction, the "unique queue" could be corrupted in rare cases, that's unfixable. * eg: the item is in the "set" but not in the "list", so the item would never be able to be pushed into the queue. * Now site admin could simply clear the queue, then everything becomes correct, the lost items could be re-pushed into queue by future operations. 3. Split the "admin/monitor" to separate pages 4. Allow to download diagnosis report * In history, there were many users reporting that Gitea queue gets stuck, or Gitea's CPU is 100% * With diagnosis report, maintainers could know what happens clearly The diagnosis report sample: [gitea-diagnosis-20230510-192913.zip](https://github.com/go-gitea/gitea/files/11441346/gitea-diagnosis-20230510-192913.zip) , use "go tool pprof profile.dat" to view the report. Screenshots: ![image](https://github.com/go-gitea/gitea/assets/2114189/320659b4-2eda-4def-8dc0-5ea08d578063) ![image](https://github.com/go-gitea/gitea/assets/2114189/c5c46fae-9dc0-44ca-8cd3-57beedc5035e) ![image](https://github.com/go-gitea/gitea/assets/2114189/6168a811-42a1-4e64-a263-0617a6c8c4fe) --------- Co-authored-by: Jason Song <i@wolfogre.com> Co-authored-by: Giteabot <teabot@gitea.io>
113 lines
3.2 KiB
Go
113 lines
3.2 KiB
Go
// Copyright 2019 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package queue
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/setting"
|
|
)
|
|
|
|
// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
|
|
type Manager struct {
|
|
mu sync.Mutex
|
|
|
|
qidCounter int64
|
|
Queues map[int64]ManagedWorkerPoolQueue
|
|
}
|
|
|
|
type ManagedWorkerPoolQueue interface {
|
|
GetName() string
|
|
GetType() string
|
|
GetItemTypeName() string
|
|
GetWorkerNumber() int
|
|
GetWorkerActiveNumber() int
|
|
GetWorkerMaxNumber() int
|
|
SetWorkerMaxNumber(num int)
|
|
GetQueueItemNumber() int
|
|
|
|
// FlushWithContext tries to make the handler process all items in the queue synchronously.
|
|
// It is for testing purpose only. It's not designed to be used in a cluster.
|
|
FlushWithContext(ctx context.Context, timeout time.Duration) error
|
|
|
|
// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
|
|
RemoveAllItems(ctx context.Context) error
|
|
}
|
|
|
|
var manager *Manager
|
|
|
|
func init() {
|
|
manager = &Manager{
|
|
Queues: make(map[int64]ManagedWorkerPoolQueue),
|
|
}
|
|
}
|
|
|
|
func GetManager() *Manager {
|
|
return manager
|
|
}
|
|
|
|
func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.qidCounter++
|
|
m.Queues[m.qidCounter] = managed
|
|
}
|
|
|
|
func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
return m.Queues[qid]
|
|
}
|
|
|
|
func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
|
|
for k, v := range m.Queues {
|
|
queues[k] = v
|
|
}
|
|
return queues
|
|
}
|
|
|
|
// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
|
|
// It is for testing purpose only. It's not designed to be used in a cluster.
|
|
func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
|
|
var finalErr error
|
|
qs := m.ManagedQueues()
|
|
for _, q := range qs {
|
|
if err := q.FlushWithContext(ctx, timeout); err != nil {
|
|
finalErr = err // TODO: in Go 1.20: errors.Join
|
|
}
|
|
}
|
|
return finalErr
|
|
}
|
|
|
|
// CreateSimpleQueue creates a simple queue from global setting config provider by name
|
|
func CreateSimpleQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
|
|
return createWorkerPoolQueue(name, setting.CfgProvider, handler, false)
|
|
}
|
|
|
|
// CreateUniqueQueue creates a unique queue from global setting config provider by name
|
|
func CreateUniqueQueue[T any](name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
|
|
return createWorkerPoolQueue(name, setting.CfgProvider, handler, true)
|
|
}
|
|
|
|
func createWorkerPoolQueue[T any](name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
|
|
queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
|
|
if err != nil {
|
|
log.Error("Failed to get queue settings for %q: %v", name, err)
|
|
return nil
|
|
}
|
|
w, err := NewWorkerPoolQueueBySetting(name, queueSetting, handler, unique)
|
|
if err != nil {
|
|
log.Error("Failed to create queue %q: %v", name, err)
|
|
return nil
|
|
}
|
|
GetManager().AddManagedQueue(w)
|
|
return w
|
|
}
|