diff --git a/models/issue_indexer.go b/models/issue_indexer.go index 1e14268a0..b58c9dc2d 100644 --- a/models/issue_indexer.go +++ b/models/issue_indexer.go @@ -25,6 +25,7 @@ func InitIssueIndexer() { // populateIssueIndexer populate the issue indexer with issue data func populateIssueIndexer() error { + batch := indexer.IssueIndexerBatch() for page := 1; ; page++ { repos, _, err := Repositories(&SearchRepoOptions{ Page: page, @@ -34,7 +35,7 @@ func populateIssueIndexer() error { return fmt.Errorf("Repositories: %v", err) } if len(repos) == 0 { - return nil + return batch.Flush() } for _, repo := range repos { issues, err := Issues(&IssuesOptions{ @@ -42,29 +43,37 @@ func populateIssueIndexer() error { IsClosed: util.OptionalBoolNone, IsPull: util.OptionalBoolNone, }) - updates := make([]indexer.IssueIndexerUpdate, len(issues)) - for i, issue := range issues { - updates[i] = issue.update() + if err != nil { + return err } - if err = indexer.BatchUpdateIssues(updates...); err != nil { - return fmt.Errorf("BatchUpdate: %v", err) + for _, issue := range issues { + if err := batch.Add(issue.update()); err != nil { + return err + } } } } } func processIssueIndexerUpdateQueue() { + batch := indexer.IssueIndexerBatch() for { + var issueID int64 select { - case issueID := <-issueIndexerUpdateQueue: - issue, err := GetIssueByID(issueID) - if err != nil { - log.Error(4, "issuesIndexer.Index: %v", err) - continue - } - if err = indexer.UpdateIssue(issue.update()); err != nil { - log.Error(4, "issuesIndexer.Index: %v", err) + case issueID = <-issueIndexerUpdateQueue: + default: + // flush whatever updates we currently have, since we + // might have to wait a while + if err := batch.Flush(); err != nil { + log.Error(4, "IssueIndexer: %v", err) } + issueID = <-issueIndexerUpdateQueue + } + issue, err := GetIssueByID(issueID) + if err != nil { + log.Error(4, "GetIssueByID: %v", err) + } else if err = batch.Add(issue.update()); err != nil { + log.Error(4, "IssueIndexer: %v", err) } } } diff --git a/modules/indexer/indexer.go b/modules/indexer/indexer.go index 5ee813412..d5bdd51f9 100644 --- a/modules/indexer/indexer.go +++ b/modules/indexer/indexer.go @@ -9,6 +9,8 @@ import ( "strconv" "github.com/blevesearch/bleve" + "github.com/blevesearch/bleve/analysis/token/unicodenorm" + "github.com/blevesearch/bleve/mapping" "github.com/blevesearch/bleve/search/query" ) @@ -41,3 +43,50 @@ func newMatchPhraseQuery(matchPhrase, field, analyzer string) *query.MatchPhrase q.Analyzer = analyzer return q } + +const unicodeNormalizeName = "unicodeNormalize" + +func addUnicodeNormalizeTokenFilter(m *mapping.IndexMappingImpl) error { + return m.AddCustomTokenFilter(unicodeNormalizeName, map[string]interface{}{ + "type": unicodenorm.Name, + "form": unicodenorm.NFC, + }) +} + +// Update represents an update to an indexer +type Update interface { + addToBatch(batch *bleve.Batch) error +} + +const maxBatchSize = 16 + +// Batch batch of indexer updates that automatically flushes once it +// reaches a certain size +type Batch struct { + batch *bleve.Batch + index bleve.Index +} + +// Add add update to batch, possibly flushing +func (batch *Batch) Add(update Update) error { + if err := update.addToBatch(batch.batch); err != nil { + return err + } + return batch.flushIfFull() +} + +func (batch *Batch) flushIfFull() error { + if batch.batch.Size() >= maxBatchSize { + return batch.Flush() + } + return nil +} + +// Flush manually flush the batch, regardless of its size +func (batch *Batch) Flush() error { + if err := batch.index.Batch(batch.batch); err != nil { + return err + } + batch.batch.Reset() + return nil +} diff --git a/modules/indexer/issue.go b/modules/indexer/issue.go index 050a623ce..62a18e2b3 100644 --- a/modules/indexer/issue.go +++ b/modules/indexer/issue.go @@ -13,7 +13,6 @@ import ( "github.com/blevesearch/bleve" "github.com/blevesearch/bleve/analysis/analyzer/custom" "github.com/blevesearch/bleve/analysis/token/lowercase" - "github.com/blevesearch/bleve/analysis/token/unicodenorm" "github.com/blevesearch/bleve/analysis/tokenizer/unicode" "github.com/blevesearch/bleve/index/upsidedown" ) @@ -35,6 +34,10 @@ type IssueIndexerUpdate struct { Data *IssueIndexerData } +func (update IssueIndexerUpdate) addToBatch(batch *bleve.Batch) error { + return batch.Index(indexerID(update.IssueID), update.Data) +} + const issueIndexerAnalyzer = "issueIndexer" // InitIssueIndexer initialize issue indexer @@ -74,17 +77,13 @@ func createIssueIndexer() error { docMapping.AddFieldMappingsAt("Content", textFieldMapping) docMapping.AddFieldMappingsAt("Comments", textFieldMapping) - const unicodeNormNFC = "unicodeNormNFC" - if err := mapping.AddCustomTokenFilter(unicodeNormNFC, map[string]interface{}{ - "type": unicodenorm.Name, - "form": unicodenorm.NFC, - }); err != nil { + if err := addUnicodeNormalizeTokenFilter(mapping); err != nil { return err } else if err = mapping.AddCustomAnalyzer(issueIndexerAnalyzer, map[string]interface{}{ "type": custom.Name, "char_filters": []string{}, "tokenizer": unicode.Name, - "token_filters": []string{unicodeNormNFC, lowercase.Name}, + "token_filters": []string{unicodeNormalizeName, lowercase.Name}, }); err != nil { return err } @@ -97,21 +96,12 @@ func createIssueIndexer() error { return err } -// UpdateIssue update the issue indexer -func UpdateIssue(update IssueIndexerUpdate) error { - return issueIndexer.Index(indexerID(update.IssueID), update.Data) -} - -// BatchUpdateIssues perform a batch update of the issue indexer -func BatchUpdateIssues(updates ...IssueIndexerUpdate) error { - batch := issueIndexer.NewBatch() - for _, update := range updates { - err := batch.Index(indexerID(update.IssueID), update.Data) - if err != nil { - return err - } +// IssueIndexerBatch batch to add updates to +func IssueIndexerBatch() *Batch { + return &Batch{ + batch: issueIndexer.NewBatch(), + index: issueIndexer, } - return issueIndexer.Batch(batch) } // SearchIssuesByKeyword searches for issues by given conditions.