summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go')
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go104
1 files changed, 53 insertions, 51 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
index 9566c9e9e..b2709a880 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
@@ -29,28 +29,29 @@ import (
// Elasticsearch Java API as documented in
// https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html.
type BulkProcessorService struct {
- c *Client
- beforeFn BulkBeforeFunc
- afterFn BulkAfterFunc
- name string // name of processor
- numWorkers int // # of workers (>= 1)
- bulkActions int // # of requests after which to commit
- bulkSize int // # of bytes after which to commit
- flushInterval time.Duration // periodic flush interval
- wantStats bool // indicates whether to gather statistics
- initialTimeout time.Duration // initial wait time before retry on errors
- maxTimeout time.Duration // max time to wait for retry on errors
+ c *Client
+ beforeFn BulkBeforeFunc
+ afterFn BulkAfterFunc
+ name string // name of processor
+ numWorkers int // # of workers (>= 1)
+ bulkActions int // # of requests after which to commit
+ bulkSize int // # of bytes after which to commit
+ flushInterval time.Duration // periodic flush interval
+ wantStats bool // indicates whether to gather statistics
+ backoff Backoff // a custom Backoff to use for errors
}
// NewBulkProcessorService creates a new BulkProcessorService.
func NewBulkProcessorService(client *Client) *BulkProcessorService {
return &BulkProcessorService{
- c: client,
- numWorkers: 1,
- bulkActions: 1000,
- bulkSize: 5 << 20, // 5 MB
- initialTimeout: time.Duration(200) * time.Millisecond,
- maxTimeout: time.Duration(10000) * time.Millisecond,
+ c: client,
+ numWorkers: 1,
+ bulkActions: 1000,
+ bulkSize: 5 << 20, // 5 MB
+ backoff: NewExponentialBackoff(
+ time.Duration(200)*time.Millisecond,
+ time.Duration(10000)*time.Millisecond,
+ ),
}
}
@@ -120,6 +121,12 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
return s
}
+// Set the backoff strategy to use for errors
+func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService {
+ s.backoff = backoff
+ return s
+}
+
// Do creates a new BulkProcessor and starts it.
// Consider the BulkProcessor as a running instance that accepts bulk requests
// and commits them to Elasticsearch, spreading the work across one or more
@@ -146,8 +153,7 @@ func (s *BulkProcessorService) Do(ctx context.Context) (*BulkProcessor, error) {
s.bulkSize,
s.flushInterval,
s.wantStats,
- s.initialTimeout,
- s.maxTimeout)
+ s.backoff)
err := p.Start(ctx)
if err != nil {
@@ -221,22 +227,21 @@ func (st *BulkProcessorWorkerStats) dup() *BulkProcessorWorkerStats {
// BulkProcessor is returned by setting up a BulkProcessorService and
// calling the Do method.
type BulkProcessor struct {
- c *Client
- beforeFn BulkBeforeFunc
- afterFn BulkAfterFunc
- name string
- bulkActions int
- bulkSize int
- numWorkers int
- executionId int64
- requestsC chan BulkableRequest
- workerWg sync.WaitGroup
- workers []*bulkWorker
- flushInterval time.Duration
- flusherStopC chan struct{}
- wantStats bool
- initialTimeout time.Duration // initial wait time before retry on errors
- maxTimeout time.Duration // max time to wait for retry on errors
+ c *Client
+ beforeFn BulkBeforeFunc
+ afterFn BulkAfterFunc
+ name string
+ bulkActions int
+ bulkSize int
+ numWorkers int
+ executionId int64
+ requestsC chan BulkableRequest
+ workerWg sync.WaitGroup
+ workers []*bulkWorker
+ flushInterval time.Duration
+ flusherStopC chan struct{}
+ wantStats bool
+ backoff Backoff
startedMu sync.Mutex // guards the following block
started bool
@@ -255,20 +260,18 @@ func newBulkProcessor(
bulkSize int,
flushInterval time.Duration,
wantStats bool,
- initialTimeout time.Duration,
- maxTimeout time.Duration) *BulkProcessor {
+ backoff Backoff) *BulkProcessor {
return &BulkProcessor{
- c: client,
- beforeFn: beforeFn,
- afterFn: afterFn,
- name: name,
- numWorkers: numWorkers,
- bulkActions: bulkActions,
- bulkSize: bulkSize,
- flushInterval: flushInterval,
- wantStats: wantStats,
- initialTimeout: initialTimeout,
- maxTimeout: maxTimeout,
+ c: client,
+ beforeFn: beforeFn,
+ afterFn: afterFn,
+ name: name,
+ numWorkers: numWorkers,
+ bulkActions: bulkActions,
+ bulkSize: bulkSize,
+ flushInterval: flushInterval,
+ wantStats: wantStats,
+ backoff: backoff,
}
}
@@ -473,7 +476,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
}
// notifyFunc will be called if retry fails
notifyFunc := func(err error) {
- w.p.c.errorf("elastic: bulk processor %q failed but will retry: %v", w.p.name, err)
+ w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err)
}
id := atomic.AddInt64(&w.p.executionId, 1)
@@ -494,8 +497,7 @@ func (w *bulkWorker) commit(ctx context.Context) error {
}
// Commit bulk requests
- policy := NewExponentialBackoff(w.p.initialTimeout, w.p.maxTimeout)
- err := RetryNotify(commitFunc, policy, notifyFunc)
+ err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
w.updateStats(res)
if err != nil {
w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)