From 6d8f122a5160f6d9e4c51579f2429dfaa62c7271 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 16 Feb 2018 06:47:51 -0800 Subject: Upgrading server dependancies (#8308) --- .../github.com/olivere/elastic/bulk_processor.go | 61 ++++++++++++++++++++-- 1 file changed, 57 insertions(+), 4 deletions(-) (limited to 'vendor/github.com/olivere/elastic/bulk_processor.go') diff --git a/vendor/github.com/olivere/elastic/bulk_processor.go b/vendor/github.com/olivere/elastic/bulk_processor.go index b2709a880..6ee8a3dee 100644 --- a/vendor/github.com/olivere/elastic/bulk_processor.go +++ b/vendor/github.com/olivere/elastic/bulk_processor.go @@ -6,6 +6,7 @@ package elastic import ( "context" + "net" "sync" "sync/atomic" "time" @@ -121,7 +122,7 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService { return s } -// Set the backoff strategy to use for errors +// Backoff sets the backoff strategy to use for errors. func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService { s.backoff = backoff return s @@ -248,6 +249,8 @@ type BulkProcessor struct { statsMu sync.Mutex // guards the following block stats *BulkProcessorStats + + stopReconnC chan struct{} // channel to signal stop reconnection attempts } func newBulkProcessor( @@ -293,6 +296,7 @@ func (p *BulkProcessor) Start(ctx context.Context) error { p.requestsC = make(chan BulkableRequest) p.executionId = 0 p.stats = newBulkProcessorStats(p.numWorkers) + p.stopReconnC = make(chan struct{}) // Create and start up workers. p.workers = make([]*bulkWorker, p.numWorkers) @@ -331,6 +335,12 @@ func (p *BulkProcessor) Close() error { return nil } + // Tell connection checkers to stop + if p.stopReconnC != nil { + close(p.stopReconnC) + p.stopReconnC = nil + } + // Stop flusher (if enabled) if p.flusherStopC != nil { p.flusherStopC <- struct{}{} @@ -436,29 +446,43 @@ func (w *bulkWorker) work(ctx context.Context) { var stop bool for !stop { + var err error select { case req, open := <-w.p.requestsC: if open { // Received a new request w.service.Add(req) if w.commitRequired() { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } } else { // Channel closed: Stop. stop = true if w.service.NumberOfActions() > 0 { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } } case <-w.flushC: // Commit outstanding requests if w.service.NumberOfActions() > 0 { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } w.flushAckC <- struct{}{} } + if !stop && err != nil { + waitForActive := func() { + // Add back pressure to prevent Add calls from filling up the request queue + ready := make(chan struct{}) + go w.waitForActiveConnection(ready) + <-ready + } + if _, ok := err.(net.Error); ok { + waitForActive() + } else if IsConnErr(err) { + waitForActive() + } + } } } @@ -511,6 +535,35 @@ func (w *bulkWorker) commit(ctx context.Context) error { return err } +func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) { + defer close(ready) + + t := time.NewTicker(5 * time.Second) + defer t.Stop() + + client := w.p.c + stopReconnC := w.p.stopReconnC + w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name) + + // loop until a health check finds at least 1 active connection or the reconnection channel is closed + for { + select { + case _, ok := <-stopReconnC: + if !ok { + w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name) + return + } + case <-t.C: + client.healthcheck(time.Duration(3)*time.Second, true) + if client.mustActiveConn() == nil { + // found an active connection + // exit and signal done to the WaitGroup + return + } + } + } +} + func (w *bulkWorker) updateStats(res *BulkResponse) { // Update stats if res != nil { -- cgit v1.2.3-1-g7c22