mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2025-01-01 11:12:13 +01:00
7f8e3192cd
* Allow common redis and leveldb connections Prevents multiple reopening of redis and leveldb connections to the same place by sharing connections. Further allows for more configurable redis connection type using the redisURI and a leveldbURI scheme. Signed-off-by: Andrew Thornton <art27@cantab.net> * add unit-test Signed-off-by: Andrew Thornton <art27@cantab.net> * as per @lunny Signed-off-by: Andrew Thornton <art27@cantab.net> * add test Signed-off-by: Andrew Thornton <art27@cantab.net> * Update modules/cache/cache_redis.go * Update modules/queue/queue_disk.go * Update modules/cache/cache_redis.go * Update modules/cache/cache_redis.go * Update modules/queue/unique_queue_disk.go * Update modules/queue/queue_disk.go * Update modules/queue/unique_queue_disk.go * Update modules/session/redis.go Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: Lauris BH <lauris@nix.lv>
117 lines
2.8 KiB
Go
117 lines
2.8 KiB
Go
// Copyright 2019 The Gitea Authors. All rights reserved.
|
|
// Use of this source code is governed by a MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package queue
|
|
|
|
import (
|
|
"code.gitea.io/gitea/modules/nosql"
|
|
|
|
"gitea.com/lunny/levelqueue"
|
|
)
|
|
|
|
// LevelQueueType is the type for level queue
|
|
const LevelQueueType Type = "level"
|
|
|
|
// LevelQueueConfiguration is the configuration for a LevelQueue
|
|
type LevelQueueConfiguration struct {
|
|
ByteFIFOQueueConfiguration
|
|
DataDir string
|
|
ConnectionString string
|
|
QueueName string
|
|
}
|
|
|
|
// LevelQueue implements a disk library queue
|
|
type LevelQueue struct {
|
|
*ByteFIFOQueue
|
|
}
|
|
|
|
// NewLevelQueue creates a ledis local queue
|
|
func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
|
|
configInterface, err := toConfig(LevelQueueConfiguration{}, cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
config := configInterface.(LevelQueueConfiguration)
|
|
|
|
if len(config.ConnectionString) == 0 {
|
|
config.ConnectionString = config.DataDir
|
|
}
|
|
|
|
byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
byteFIFOQueue, err := NewByteFIFOQueue(LevelQueueType, byteFIFO, handle, config.ByteFIFOQueueConfiguration, exemplar)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queue := &LevelQueue{
|
|
ByteFIFOQueue: byteFIFOQueue,
|
|
}
|
|
queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
|
|
return queue, nil
|
|
}
|
|
|
|
var _ (ByteFIFO) = &LevelQueueByteFIFO{}
|
|
|
|
// LevelQueueByteFIFO represents a ByteFIFO formed from a LevelQueue
|
|
type LevelQueueByteFIFO struct {
|
|
internal *levelqueue.Queue
|
|
connection string
|
|
}
|
|
|
|
// NewLevelQueueByteFIFO creates a ByteFIFO formed from a LevelQueue
|
|
func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, error) {
|
|
db, err := nosql.GetManager().GetLevelDB(connection)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
internal, err := levelqueue.NewQueue(db, []byte(prefix), false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &LevelQueueByteFIFO{
|
|
connection: connection,
|
|
internal: internal,
|
|
}, nil
|
|
}
|
|
|
|
// PushFunc will push data into the fifo
|
|
func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error {
|
|
if fn != nil {
|
|
if err := fn(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return fifo.internal.LPush(data)
|
|
}
|
|
|
|
// Pop pops data from the start of the fifo
|
|
func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) {
|
|
data, err := fifo.internal.RPop()
|
|
if err != nil && err != levelqueue.ErrNotFound {
|
|
return nil, err
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
// Close this fifo
|
|
func (fifo *LevelQueueByteFIFO) Close() error {
|
|
err := fifo.internal.Close()
|
|
_ = nosql.GetManager().CloseLevelDB(fifo.connection)
|
|
return err
|
|
}
|
|
|
|
// Len returns the length of the fifo
|
|
func (fifo *LevelQueueByteFIFO) Len() int64 {
|
|
return fifo.internal.Len()
|
|
}
|
|
|
|
func init() {
|
|
queuesMap[LevelQueueType] = NewLevelQueue
|
|
}
|