Issues: Gracefulise the issues indexer
Remove the old now unused specific queues
This commit is contained in:
parent
9ad9070555
commit
d6b540475f
8 changed files with 166 additions and 406 deletions
|
@ -11,8 +11,10 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/models"
|
||||
"code.gitea.io/gitea/modules/indexer/issues"
|
||||
"code.gitea.io/gitea/modules/references"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"code.gitea.io/gitea/modules/test"
|
||||
|
@ -87,7 +89,12 @@ func TestViewIssuesKeyword(t *testing.T) {
|
|||
defer prepareTestEnv(t)()
|
||||
|
||||
repo := models.AssertExistsAndLoadBean(t, &models.Repository{ID: 1}).(*models.Repository)
|
||||
|
||||
issue := models.AssertExistsAndLoadBean(t, &models.Issue{
|
||||
RepoID: repo.ID,
|
||||
Index: 1,
|
||||
}).(*models.Issue)
|
||||
issues.UpdateIssueIndexer(issue)
|
||||
time.Sleep(time.Second * 1)
|
||||
const keyword = "first"
|
||||
req := NewRequestf(t, "GET", "%s/issues?q=%s", repo.RelLink(), keyword)
|
||||
resp := MakeRequest(t, req, http.StatusOK)
|
||||
|
|
|
@ -266,3 +266,8 @@ func (b *BleveIndexer) Search(keyword string, repoIDs []int64, limit, start int)
|
|||
}
|
||||
return &ret, nil
|
||||
}
|
||||
|
||||
// Close the Index
|
||||
func (b *BleveIndexer) Close() error {
|
||||
return b.indexer.Close()
|
||||
}
|
||||
|
|
|
@ -25,6 +25,11 @@ func (db *DBIndexer) Delete(ids ...int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Close dummy function
|
||||
func (db *DBIndexer) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Search dummy function
|
||||
func (db *DBIndexer) Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error) {
|
||||
total, ids, err := models.SearchIssueIDsByKeyword(kw, repoIDs, limit, start)
|
||||
|
|
|
@ -5,12 +5,16 @@
|
|||
package issues
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/models"
|
||||
"code.gitea.io/gitea/modules/graceful"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/queue"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"code.gitea.io/gitea/modules/util"
|
||||
)
|
||||
|
@ -44,6 +48,7 @@ type Indexer interface {
|
|||
Index(issue []*IndexerData) error
|
||||
Delete(ids ...int64) error
|
||||
Search(kw string, repoIDs []int64, limit, start int) (*SearchResult, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type indexerHolder struct {
|
||||
|
@ -75,9 +80,8 @@ func (h *indexerHolder) get() Indexer {
|
|||
}
|
||||
|
||||
var (
|
||||
issueIndexerChannel = make(chan *IndexerData, setting.Indexer.UpdateQueueLength)
|
||||
// issueIndexerQueue queue of issue ids to be updated
|
||||
issueIndexerQueue Queue
|
||||
issueIndexerQueue queue.Queue
|
||||
holder = newIndexerHolder()
|
||||
)
|
||||
|
||||
|
@ -85,88 +89,142 @@ var (
|
|||
// all issue index done.
|
||||
func InitIssueIndexer(syncReindex bool) {
|
||||
waitChannel := make(chan time.Duration)
|
||||
|
||||
// Create the Queue
|
||||
switch setting.Indexer.IssueType {
|
||||
case "bleve":
|
||||
handler := func(data ...queue.Data) {
|
||||
iData := make([]*IndexerData, 0, setting.Indexer.IssueQueueBatchNumber)
|
||||
for _, datum := range data {
|
||||
indexerData, ok := datum.(*IndexerData)
|
||||
if !ok {
|
||||
log.Error("Unable to process provided datum: %v - not possible to cast to IndexerData", datum)
|
||||
continue
|
||||
}
|
||||
log.Trace("IndexerData Process: %d %v %t", indexerData.ID, indexerData.IDs, indexerData.IsDelete)
|
||||
if indexerData.IsDelete {
|
||||
_ = holder.get().Delete(indexerData.IDs...)
|
||||
continue
|
||||
}
|
||||
iData = append(iData, indexerData)
|
||||
}
|
||||
if err := holder.get().Index(iData); err != nil {
|
||||
log.Error("Error whilst indexing: %v Error: %v", iData, err)
|
||||
}
|
||||
}
|
||||
|
||||
queueType := queue.PersistableChannelQueueType
|
||||
switch setting.Indexer.IssueQueueType {
|
||||
case setting.LevelQueueType:
|
||||
queueType = queue.LevelQueueType
|
||||
case setting.ChannelQueueType:
|
||||
queueType = queue.PersistableChannelQueueType
|
||||
case setting.RedisQueueType:
|
||||
queueType = queue.RedisQueueType
|
||||
default:
|
||||
log.Fatal("Unsupported indexer queue type: %v",
|
||||
setting.Indexer.IssueQueueType)
|
||||
}
|
||||
|
||||
name := "issue_indexer_queue"
|
||||
opts := make(map[string]interface{})
|
||||
opts["QueueLength"] = setting.Indexer.UpdateQueueLength
|
||||
opts["BatchLength"] = setting.Indexer.IssueQueueBatchNumber
|
||||
opts["DataDir"] = setting.Indexer.IssueQueueDir
|
||||
|
||||
addrs, password, dbIdx, err := setting.ParseQueueConnStr(setting.Indexer.IssueQueueConnStr)
|
||||
if queueType == queue.RedisQueueType && err != nil {
|
||||
log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v",
|
||||
setting.Indexer.IssueQueueConnStr,
|
||||
err)
|
||||
}
|
||||
opts["Addresses"] = addrs
|
||||
opts["Password"] = password
|
||||
opts["DBIndex"] = dbIdx
|
||||
opts["QueueName"] = name
|
||||
opts["Name"] = name
|
||||
opts["Workers"] = 1
|
||||
opts["BlockTimeout"] = 1 * time.Second
|
||||
opts["BoostTimeout"] = 5 * time.Minute
|
||||
opts["BoostWorkers"] = 5
|
||||
cfg, err := json.Marshal(opts)
|
||||
if err != nil {
|
||||
log.Error("Unable to marshall generic options: %v Error: %v", opts, err)
|
||||
log.Fatal("Unable to create issue indexer queue with type %s: %v",
|
||||
queueType,
|
||||
err)
|
||||
}
|
||||
log.Debug("Creating issue indexer queue with type %s: configuration: %s", queueType, string(cfg))
|
||||
issueIndexerQueue, err = queue.CreateQueue(queueType, handler, cfg, &IndexerData{})
|
||||
if err != nil {
|
||||
issueIndexerQueue, err = queue.CreateQueue(queue.WrappedQueueType, handler, queue.WrappedQueueConfiguration{
|
||||
Underlying: queueType,
|
||||
Timeout: setting.GracefulHammerTime + 30*time.Second,
|
||||
MaxAttempts: 10,
|
||||
Config: cfg,
|
||||
QueueLength: setting.Indexer.UpdateQueueLength,
|
||||
Name: name,
|
||||
}, &IndexerData{})
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create issue indexer queue with type %s: %v : %v",
|
||||
queueType,
|
||||
string(cfg),
|
||||
err)
|
||||
}
|
||||
default:
|
||||
issueIndexerQueue = &queue.DummyQueue{}
|
||||
}
|
||||
|
||||
// Create the Indexer
|
||||
go func() {
|
||||
start := time.Now()
|
||||
log.Info("Initializing Issue Indexer")
|
||||
log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
|
||||
var populate bool
|
||||
var dummyQueue bool
|
||||
switch setting.Indexer.IssueType {
|
||||
case "bleve":
|
||||
issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
|
||||
exist, err := issueIndexer.Init()
|
||||
if err != nil {
|
||||
log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
|
||||
}
|
||||
populate = !exist
|
||||
holder.set(issueIndexer)
|
||||
graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) {
|
||||
issueIndexer := NewBleveIndexer(setting.Indexer.IssuePath)
|
||||
exist, err := issueIndexer.Init()
|
||||
if err != nil {
|
||||
log.Fatal("Unable to initialize Bleve Issue Indexer: %v", err)
|
||||
}
|
||||
populate = !exist
|
||||
holder.set(issueIndexer)
|
||||
atTerminate(context.Background(), func() {
|
||||
log.Debug("Closing issue indexer")
|
||||
issueIndexer := holder.get()
|
||||
if issueIndexer != nil {
|
||||
err := issueIndexer.Close()
|
||||
if err != nil {
|
||||
log.Error("Error whilst closing the issue indexer: %v", err)
|
||||
}
|
||||
}
|
||||
log.Info("PID: %d Issue Indexer closed", os.Getpid())
|
||||
})
|
||||
log.Debug("Created Bleve Indexer")
|
||||
})
|
||||
case "db":
|
||||
issueIndexer := &DBIndexer{}
|
||||
holder.set(issueIndexer)
|
||||
dummyQueue = true
|
||||
default:
|
||||
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
|
||||
}
|
||||
|
||||
if dummyQueue {
|
||||
issueIndexerQueue = &DummyQueue{}
|
||||
} else {
|
||||
var err error
|
||||
switch setting.Indexer.IssueQueueType {
|
||||
case setting.LevelQueueType:
|
||||
issueIndexerQueue, err = NewLevelQueue(
|
||||
holder.get(),
|
||||
setting.Indexer.IssueQueueDir,
|
||||
setting.Indexer.IssueQueueBatchNumber)
|
||||
if err != nil {
|
||||
log.Fatal(
|
||||
"Unable create level queue for issue queue dir: %s batch number: %d : %v",
|
||||
setting.Indexer.IssueQueueDir,
|
||||
setting.Indexer.IssueQueueBatchNumber,
|
||||
err)
|
||||
}
|
||||
case setting.ChannelQueueType:
|
||||
issueIndexerQueue = NewChannelQueue(holder.get(), setting.Indexer.IssueQueueBatchNumber)
|
||||
case setting.RedisQueueType:
|
||||
addrs, pass, idx, err := parseConnStr(setting.Indexer.IssueQueueConnStr)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to parse connection string for RedisQueueType: %s : %v",
|
||||
setting.Indexer.IssueQueueConnStr,
|
||||
err)
|
||||
}
|
||||
issueIndexerQueue, err = NewRedisQueue(addrs, pass, idx, holder.get(), setting.Indexer.IssueQueueBatchNumber)
|
||||
if err != nil {
|
||||
log.Fatal("Unable to create RedisQueue: %s : %v",
|
||||
setting.Indexer.IssueQueueConnStr,
|
||||
err)
|
||||
}
|
||||
default:
|
||||
log.Fatal("Unsupported indexer queue type: %v",
|
||||
setting.Indexer.IssueQueueType)
|
||||
}
|
||||
|
||||
go func() {
|
||||
err = issueIndexerQueue.Run()
|
||||
if err != nil {
|
||||
log.Error("issueIndexerQueue.Run: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for data := range issueIndexerChannel {
|
||||
_ = issueIndexerQueue.Push(data)
|
||||
}
|
||||
}()
|
||||
// Start processing the queue
|
||||
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
|
||||
|
||||
// Populate the index
|
||||
if populate {
|
||||
if syncReindex {
|
||||
populateIssueIndexer()
|
||||
graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
|
||||
} else {
|
||||
go populateIssueIndexer()
|
||||
go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
|
||||
}
|
||||
}
|
||||
waitChannel <- time.Since(start)
|
||||
}()
|
||||
|
||||
if syncReindex {
|
||||
<-waitChannel
|
||||
} else if setting.Indexer.StartupTimeout > 0 {
|
||||
|
@ -179,6 +237,9 @@ func InitIssueIndexer(syncReindex bool) {
|
|||
case duration := <-waitChannel:
|
||||
log.Info("Issue Indexer Initialization took %v", duration)
|
||||
case <-time.After(timeout):
|
||||
if shutdownable, ok := issueIndexerQueue.(queue.Shutdownable); ok {
|
||||
shutdownable.Terminate()
|
||||
}
|
||||
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
|
||||
}
|
||||
}()
|
||||
|
@ -186,8 +247,14 @@ func InitIssueIndexer(syncReindex bool) {
|
|||
}
|
||||
|
||||
// populateIssueIndexer populate the issue indexer with issue data
|
||||
func populateIssueIndexer() {
|
||||
func populateIssueIndexer(ctx context.Context) {
|
||||
for page := 1; ; page++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn("Issue Indexer population shutdown before completion")
|
||||
return
|
||||
default:
|
||||
}
|
||||
repos, _, err := models.SearchRepositoryByName(&models.SearchRepoOptions{
|
||||
Page: page,
|
||||
PageSize: models.RepositoryListDefaultPageSize,
|
||||
|
@ -200,10 +267,17 @@ func populateIssueIndexer() {
|
|||
continue
|
||||
}
|
||||
if len(repos) == 0 {
|
||||
log.Debug("Issue Indexer population complete")
|
||||
return
|
||||
}
|
||||
|
||||
for _, repo := range repos {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Issue Indexer population shutdown before completion")
|
||||
return
|
||||
default:
|
||||
}
|
||||
UpdateRepoIndexer(repo)
|
||||
}
|
||||
}
|
||||
|
@ -237,13 +311,17 @@ func UpdateIssueIndexer(issue *models.Issue) {
|
|||
comments = append(comments, comment.Content)
|
||||
}
|
||||
}
|
||||
issueIndexerChannel <- &IndexerData{
|
||||
indexerData := &IndexerData{
|
||||
ID: issue.ID,
|
||||
RepoID: issue.RepoID,
|
||||
Title: issue.Title,
|
||||
Content: issue.Content,
|
||||
Comments: comments,
|
||||
}
|
||||
log.Debug("Adding to channel: %v", indexerData)
|
||||
if err := issueIndexerQueue.Push(indexerData); err != nil {
|
||||
log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteRepoIssueIndexer deletes repo's all issues indexes
|
||||
|
@ -258,11 +336,13 @@ func DeleteRepoIssueIndexer(repo *models.Repository) {
|
|||
if len(ids) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
issueIndexerChannel <- &IndexerData{
|
||||
indexerData := &IndexerData{
|
||||
IDs: ids,
|
||||
IsDelete: true,
|
||||
}
|
||||
if err := issueIndexerQueue.Push(indexerData); err != nil {
|
||||
log.Error("Unable to push to issue indexer: %v: Error: %v", indexerData, err)
|
||||
}
|
||||
}
|
||||
|
||||
// SearchIssuesByKeyword search issue ids by keywords and repo id
|
||||
|
|
|
@ -1,25 +0,0 @@
|
|||
// Copyright 2018 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package issues
|
||||
|
||||
// Queue defines an interface to save an issue indexer queue
|
||||
type Queue interface {
|
||||
Run() error
|
||||
Push(*IndexerData) error
|
||||
}
|
||||
|
||||
// DummyQueue represents an empty queue
|
||||
type DummyQueue struct {
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (b *DummyQueue) Run() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Push pushes data to indexer
|
||||
func (b *DummyQueue) Push(*IndexerData) error {
|
||||
return nil
|
||||
}
|
|
@ -1,62 +0,0 @@
|
|||
// Copyright 2018 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package issues
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
)
|
||||
|
||||
// ChannelQueue implements
|
||||
type ChannelQueue struct {
|
||||
queue chan *IndexerData
|
||||
indexer Indexer
|
||||
batchNumber int
|
||||
}
|
||||
|
||||
// NewChannelQueue create a memory channel queue
|
||||
func NewChannelQueue(indexer Indexer, batchNumber int) *ChannelQueue {
|
||||
return &ChannelQueue{
|
||||
queue: make(chan *IndexerData, setting.Indexer.UpdateQueueLength),
|
||||
indexer: indexer,
|
||||
batchNumber: batchNumber,
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (c *ChannelQueue) Run() error {
|
||||
var i int
|
||||
var datas = make([]*IndexerData, 0, c.batchNumber)
|
||||
for {
|
||||
select {
|
||||
case data := <-c.queue:
|
||||
if data.IsDelete {
|
||||
_ = c.indexer.Delete(data.IDs...)
|
||||
continue
|
||||
}
|
||||
|
||||
datas = append(datas, data)
|
||||
if len(datas) >= c.batchNumber {
|
||||
_ = c.indexer.Index(datas)
|
||||
// TODO: save the point
|
||||
datas = make([]*IndexerData, 0, c.batchNumber)
|
||||
}
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
i++
|
||||
if i >= 3 && len(datas) > 0 {
|
||||
_ = c.indexer.Index(datas)
|
||||
// TODO: save the point
|
||||
datas = make([]*IndexerData, 0, c.batchNumber)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Push will push the indexer data to queue
|
||||
func (c *ChannelQueue) Push(data *IndexerData) error {
|
||||
c.queue <- data
|
||||
return nil
|
||||
}
|
|
@ -1,104 +0,0 @@
|
|||
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package issues
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
|
||||
"gitea.com/lunny/levelqueue"
|
||||
)
|
||||
|
||||
var (
|
||||
_ Queue = &LevelQueue{}
|
||||
)
|
||||
|
||||
// LevelQueue implements a disk library queue
|
||||
type LevelQueue struct {
|
||||
indexer Indexer
|
||||
queue *levelqueue.Queue
|
||||
batchNumber int
|
||||
}
|
||||
|
||||
// NewLevelQueue creates a ledis local queue
|
||||
func NewLevelQueue(indexer Indexer, dataDir string, batchNumber int) (*LevelQueue, error) {
|
||||
queue, err := levelqueue.Open(dataDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &LevelQueue{
|
||||
indexer: indexer,
|
||||
queue: queue,
|
||||
batchNumber: batchNumber,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (l *LevelQueue) Run() error {
|
||||
var i int
|
||||
var datas = make([]*IndexerData, 0, l.batchNumber)
|
||||
for {
|
||||
i++
|
||||
if len(datas) > l.batchNumber || (len(datas) > 0 && i > 3) {
|
||||
_ = l.indexer.Index(datas)
|
||||
datas = make([]*IndexerData, 0, l.batchNumber)
|
||||
i = 0
|
||||
continue
|
||||
}
|
||||
|
||||
bs, err := l.queue.RPop()
|
||||
if err != nil {
|
||||
if err != levelqueue.ErrNotFound {
|
||||
log.Error("RPop: %v", err)
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(bs) == 0 {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
var data IndexerData
|
||||
err = json.Unmarshal(bs, &data)
|
||||
if err != nil {
|
||||
log.Error("Unmarshal: %v", err)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Trace("LevelQueue: task found: %#v", data)
|
||||
|
||||
if data.IsDelete {
|
||||
if data.ID > 0 {
|
||||
if err = l.indexer.Delete(data.ID); err != nil {
|
||||
log.Error("indexer.Delete: %v", err)
|
||||
}
|
||||
} else if len(data.IDs) > 0 {
|
||||
if err = l.indexer.Delete(data.IDs...); err != nil {
|
||||
log.Error("indexer.Delete: %v", err)
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
continue
|
||||
}
|
||||
|
||||
datas = append(datas, &data)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
}
|
||||
}
|
||||
|
||||
// Push will push the indexer data to queue
|
||||
func (l *LevelQueue) Push(data *IndexerData) error {
|
||||
bs, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return l.queue.LPush(bs)
|
||||
}
|
|
@ -1,146 +0,0 @@
|
|||
// Copyright 2019 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package issues
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
)
|
||||
|
||||
var (
|
||||
_ Queue = &RedisQueue{}
|
||||
)
|
||||
|
||||
type redisClient interface {
|
||||
RPush(key string, args ...interface{}) *redis.IntCmd
|
||||
LPop(key string) *redis.StringCmd
|
||||
Ping() *redis.StatusCmd
|
||||
}
|
||||
|
||||
// RedisQueue redis queue
|
||||
type RedisQueue struct {
|
||||
client redisClient
|
||||
queueName string
|
||||
indexer Indexer
|
||||
batchNumber int
|
||||
}
|
||||
|
||||
func parseConnStr(connStr string) (addrs, password string, dbIdx int, err error) {
|
||||
fields := strings.Fields(connStr)
|
||||
for _, f := range fields {
|
||||
items := strings.SplitN(f, "=", 2)
|
||||
if len(items) < 2 {
|
||||
continue
|
||||
}
|
||||
switch strings.ToLower(items[0]) {
|
||||
case "addrs":
|
||||
addrs = items[1]
|
||||
case "password":
|
||||
password = items[1]
|
||||
case "db":
|
||||
dbIdx, err = strconv.Atoi(items[1])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// NewRedisQueue creates single redis or cluster redis queue
|
||||
func NewRedisQueue(addrs string, password string, dbIdx int, indexer Indexer, batchNumber int) (*RedisQueue, error) {
|
||||
dbs := strings.Split(addrs, ",")
|
||||
var queue = RedisQueue{
|
||||
queueName: "issue_indexer_queue",
|
||||
indexer: indexer,
|
||||
batchNumber: batchNumber,
|
||||
}
|
||||
if len(dbs) == 0 {
|
||||
return nil, errors.New("no redis host found")
|
||||
} else if len(dbs) == 1 {
|
||||
queue.client = redis.NewClient(&redis.Options{
|
||||
Addr: strings.TrimSpace(dbs[0]), // use default Addr
|
||||
Password: password, // no password set
|
||||
DB: dbIdx, // use default DB
|
||||
})
|
||||
} else {
|
||||
queue.client = redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: dbs,
|
||||
})
|
||||
}
|
||||
if err := queue.client.Ping().Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &queue, nil
|
||||
}
|
||||
|
||||
// Run runs the redis queue
|
||||
func (r *RedisQueue) Run() error {
|
||||
var i int
|
||||
var datas = make([]*IndexerData, 0, r.batchNumber)
|
||||
for {
|
||||
bs, err := r.client.LPop(r.queueName).Bytes()
|
||||
if err != nil && err != redis.Nil {
|
||||
log.Error("LPop faile: %v", err)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
i++
|
||||
if len(datas) > r.batchNumber || (len(datas) > 0 && i > 3) {
|
||||
_ = r.indexer.Index(datas)
|
||||
datas = make([]*IndexerData, 0, r.batchNumber)
|
||||
i = 0
|
||||
}
|
||||
|
||||
if len(bs) == 0 {
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
var data IndexerData
|
||||
err = json.Unmarshal(bs, &data)
|
||||
if err != nil {
|
||||
log.Error("Unmarshal: %v", err)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Trace("RedisQueue: task found: %#v", data)
|
||||
|
||||
if data.IsDelete {
|
||||
if data.ID > 0 {
|
||||
if err = r.indexer.Delete(data.ID); err != nil {
|
||||
log.Error("indexer.Delete: %v", err)
|
||||
}
|
||||
} else if len(data.IDs) > 0 {
|
||||
if err = r.indexer.Delete(data.IDs...); err != nil {
|
||||
log.Error("indexer.Delete: %v", err)
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
continue
|
||||
}
|
||||
|
||||
datas = append(datas, &data)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
}
|
||||
|
||||
// Push implements Queue
|
||||
func (r *RedisQueue) Push(data *IndexerData) error {
|
||||
bs, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.client.RPush(r.queueName, bs).Err()
|
||||
}
|
Loading…
Reference in a new issue