diff options
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go')
-rw-r--r-- | vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go | 104 |
1 files changed, 53 insertions, 51 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go index 9566c9e9e..b2709a880 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go +++ b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go @@ -29,28 +29,29 @@ import ( // Elasticsearch Java API as documented in // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html. type BulkProcessorService struct { - c *Client - beforeFn BulkBeforeFunc - afterFn BulkAfterFunc - name string // name of processor - numWorkers int // # of workers (>= 1) - bulkActions int // # of requests after which to commit - bulkSize int // # of bytes after which to commit - flushInterval time.Duration // periodic flush interval - wantStats bool // indicates whether to gather statistics - initialTimeout time.Duration // initial wait time before retry on errors - maxTimeout time.Duration // max time to wait for retry on errors + c *Client + beforeFn BulkBeforeFunc + afterFn BulkAfterFunc + name string // name of processor + numWorkers int // # of workers (>= 1) + bulkActions int // # of requests after which to commit + bulkSize int // # of bytes after which to commit + flushInterval time.Duration // periodic flush interval + wantStats bool // indicates whether to gather statistics + backoff Backoff // a custom Backoff to use for errors } // NewBulkProcessorService creates a new BulkProcessorService. func NewBulkProcessorService(client *Client) *BulkProcessorService { return &BulkProcessorService{ - c: client, - numWorkers: 1, - bulkActions: 1000, - bulkSize: 5 << 20, // 5 MB - initialTimeout: time.Duration(200) * time.Millisecond, - maxTimeout: time.Duration(10000) * time.Millisecond, + c: client, + numWorkers: 1, + bulkActions: 1000, + bulkSize: 5 << 20, // 5 MB + backoff: NewExponentialBackoff( + time.Duration(200)*time.Millisecond, + time.Duration(10000)*time.Millisecond, + ), } } @@ -120,6 +121,12 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService { return s } +// Set the backoff strategy to use for errors +func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService { + s.backoff = backoff + return s +} + // Do creates a new BulkProcessor and starts it. // Consider the BulkProcessor as a running instance that accepts bulk requests // and commits them to Elasticsearch, spreading the work across one or more @@ -146,8 +153,7 @@ func (s *BulkProcessorService) Do(ctx context.Context) (*BulkProcessor, error) { s.bulkSize, s.flushInterval, s.wantStats, - s.initialTimeout, - s.maxTimeout) + s.backoff) err := p.Start(ctx) if err != nil { @@ -221,22 +227,21 @@ func (st *BulkProcessorWorkerStats) dup() *BulkProcessorWorkerStats { // BulkProcessor is returned by setting up a BulkProcessorService and // calling the Do method. type BulkProcessor struct { - c *Client - beforeFn BulkBeforeFunc - afterFn BulkAfterFunc - name string - bulkActions int - bulkSize int - numWorkers int - executionId int64 - requestsC chan BulkableRequest - workerWg sync.WaitGroup - workers []*bulkWorker - flushInterval time.Duration - flusherStopC chan struct{} - wantStats bool - initialTimeout time.Duration // initial wait time before retry on errors - maxTimeout time.Duration // max time to wait for retry on errors + c *Client + beforeFn BulkBeforeFunc + afterFn BulkAfterFunc + name string + bulkActions int + bulkSize int + numWorkers int + executionId int64 + requestsC chan BulkableRequest + workerWg sync.WaitGroup + workers []*bulkWorker + flushInterval time.Duration + flusherStopC chan struct{} + wantStats bool + backoff Backoff startedMu sync.Mutex // guards the following block started bool @@ -255,20 +260,18 @@ func newBulkProcessor( bulkSize int, flushInterval time.Duration, wantStats bool, - initialTimeout time.Duration, - maxTimeout time.Duration) *BulkProcessor { + backoff Backoff) *BulkProcessor { return &BulkProcessor{ - c: client, - beforeFn: beforeFn, - afterFn: afterFn, - name: name, - numWorkers: numWorkers, - bulkActions: bulkActions, - bulkSize: bulkSize, - flushInterval: flushInterval, - wantStats: wantStats, - initialTimeout: initialTimeout, - maxTimeout: maxTimeout, + c: client, + beforeFn: beforeFn, + afterFn: afterFn, + name: name, + numWorkers: numWorkers, + bulkActions: bulkActions, + bulkSize: bulkSize, + flushInterval: flushInterval, + wantStats: wantStats, + backoff: backoff, } } @@ -473,7 +476,7 @@ func (w *bulkWorker) commit(ctx context.Context) error { } // notifyFunc will be called if retry fails notifyFunc := func(err error) { - w.p.c.errorf("elastic: bulk processor %q failed but will retry: %v", w.p.name, err) + w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err) } id := atomic.AddInt64(&w.p.executionId, 1) @@ -494,8 +497,7 @@ func (w *bulkWorker) commit(ctx context.Context) error { } // Commit bulk requests - policy := NewExponentialBackoff(w.p.initialTimeout, w.p.maxTimeout) - err := RetryNotify(commitFunc, policy, notifyFunc) + err := RetryNotify(commitFunc, w.p.backoff, notifyFunc) w.updateStats(res) if err != nil { w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err) |