summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go')
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go61
1 files changed, 57 insertions, 4 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
index b2709a880..6ee8a3dee 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
@@ -6,6 +6,7 @@ package elastic
import (
"context"
+ "net"
"sync"
"sync/atomic"
"time"
@@ -121,7 +122,7 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
return s
}
-// Set the backoff strategy to use for errors
+// Backoff sets the backoff strategy to use for errors.
func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService {
s.backoff = backoff
return s
@@ -248,6 +249,8 @@ type BulkProcessor struct {
statsMu sync.Mutex // guards the following block
stats *BulkProcessorStats
+
+ stopReconnC chan struct{} // channel to signal stop reconnection attempts
}
func newBulkProcessor(
@@ -293,6 +296,7 @@ func (p *BulkProcessor) Start(ctx context.Context) error {
p.requestsC = make(chan BulkableRequest)
p.executionId = 0
p.stats = newBulkProcessorStats(p.numWorkers)
+ p.stopReconnC = make(chan struct{})
// Create and start up workers.
p.workers = make([]*bulkWorker, p.numWorkers)
@@ -331,6 +335,12 @@ func (p *BulkProcessor) Close() error {
return nil
}
+ // Tell connection checkers to stop
+ if p.stopReconnC != nil {
+ close(p.stopReconnC)
+ p.stopReconnC = nil
+ }
+
// Stop flusher (if enabled)
if p.flusherStopC != nil {
p.flusherStopC <- struct{}{}
@@ -436,29 +446,43 @@ func (w *bulkWorker) work(ctx context.Context) {
var stop bool
for !stop {
+ var err error
select {
case req, open := <-w.p.requestsC:
if open {
// Received a new request
w.service.Add(req)
if w.commitRequired() {
- w.commit(ctx) // TODO swallow errors here?
+ err = w.commit(ctx)
}
} else {
// Channel closed: Stop.
stop = true
if w.service.NumberOfActions() > 0 {
- w.commit(ctx) // TODO swallow errors here?
+ err = w.commit(ctx)
}
}
case <-w.flushC:
// Commit outstanding requests
if w.service.NumberOfActions() > 0 {
- w.commit(ctx) // TODO swallow errors here?
+ err = w.commit(ctx)
}
w.flushAckC <- struct{}{}
}
+ if !stop && err != nil {
+ waitForActive := func() {
+ // Add back pressure to prevent Add calls from filling up the request queue
+ ready := make(chan struct{})
+ go w.waitForActiveConnection(ready)
+ <-ready
+ }
+ if _, ok := err.(net.Error); ok {
+ waitForActive()
+ } else if IsConnErr(err) {
+ waitForActive()
+ }
+ }
}
}
@@ -511,6 +535,35 @@ func (w *bulkWorker) commit(ctx context.Context) error {
return err
}
+func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {
+ defer close(ready)
+
+ t := time.NewTicker(5 * time.Second)
+ defer t.Stop()
+
+ client := w.p.c
+ stopReconnC := w.p.stopReconnC
+ w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name)
+
+ // loop until a health check finds at least 1 active connection or the reconnection channel is closed
+ for {
+ select {
+ case _, ok := <-stopReconnC:
+ if !ok {
+ w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name)
+ return
+ }
+ case <-t.C:
+ client.healthcheck(time.Duration(3)*time.Second, true)
+ if client.mustActiveConn() == nil {
+ // found an active connection
+ // exit and signal done to the WaitGroup
+ return
+ }
+ }
+ }
+}
+
func (w *bulkWorker) updateStats(res *BulkResponse) {
// Update stats
if res != nil {