From 6d8f122a5160f6d9e4c51579f2429dfaa62c7271 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 16 Feb 2018 06:47:51 -0800 Subject: Upgrading server dependancies (#8308) --- .../elastic.v5/recipes/bulk_processor/main.go | 149 +++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go (limited to 'vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor') diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go new file mode 100644 index 000000000..f13243297 --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go @@ -0,0 +1,149 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// BulkProcessor runs a bulk processing job that fills an index +// given certain criteria like flush interval etc. +// +// Example +// +// bulk_processor -url=http://127.0.0.1:9200/bulk-processor-test?sniff=false -n=100000 -flush-interval=1s +// +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "sync/atomic" + "syscall" + "time" + + "github.com/google/uuid" + + "github.com/olivere/elastic" + "github.com/olivere/elastic/config" +) + +func main() { + var ( + url = flag.String("url", "http://localhost:9200/bulk-processor-test", "Elasticsearch URL") + numWorkers = flag.Int("num-workers", 4, "Number of workers") + n = flag.Int64("n", -1, "Number of documents to process (-1 for unlimited)") + flushInterval = flag.Duration("flush-interval", 1*time.Second, "Flush interval") + bulkActions = flag.Int("bulk-actions", 0, "Number of bulk actions before committing") + bulkSize = flag.Int("bulk-size", 0, "Size of bulk requests before committing") + ) + flag.Parse() + log.SetFlags(0) + + rand.Seed(time.Now().UnixNano()) + + // Parse configuration from URL + cfg, err := config.Parse(*url) + if err != nil { + log.Fatal(err) + } + + // Create an Elasticsearch client from the parsed config + client, err := elastic.NewClientFromConfig(cfg) + if err != nil { + log.Fatal(err) + } + + // Drop old index + exists, err := client.IndexExists(cfg.Index).Do(context.Background()) + if err != nil { + log.Fatal(err) + } + if exists { + _, err = client.DeleteIndex(cfg.Index).Do(context.Background()) + if err != nil { + log.Fatal(err) + } + } + + // Create processor + bulkp := elastic.NewBulkProcessorService(client). + Name("bulk-test-processor"). + Stats(true). + Backoff(elastic.StopBackoff{}). + FlushInterval(*flushInterval). + Workers(*numWorkers) + if *bulkActions > 0 { + bulkp = bulkp.BulkActions(*bulkActions) + } + if *bulkSize > 0 { + bulkp = bulkp.BulkSize(*bulkSize) + } + p, err := bulkp.Do(context.Background()) + if err != nil { + log.Fatal(err) + } + + var created int64 + errc := make(chan error, 1) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + <-c + errc <- nil + }() + + go func() { + defer func() { + if err := p.Close(); err != nil { + errc <- err + } + }() + + type Doc struct { + Timestamp time.Time `json:"@timestamp"` + } + + for { + current := atomic.AddInt64(&created, 1) + if *n > 0 && current >= *n { + errc <- nil + return + } + r := elastic.NewBulkIndexRequest(). + Index(cfg.Index). + Type("doc"). + Id(uuid.New().String()). + Doc(Doc{Timestamp: time.Now()}) + p.Add(r) + + time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond) + } + }() + + go func() { + t := time.NewTicker(1 * time.Second) + defer t.Stop() + for range t.C { + stats := p.Stats() + written := atomic.LoadInt64(&created) + var queued int64 + for _, w := range stats.Workers { + queued += w.Queued + } + fmt.Printf("Queued=%5d Written=%8d Succeeded=%8d Failed=%8d Comitted=%6d Flushed=%6d\n", + queued, + written, + stats.Succeeded, + stats.Failed, + stats.Committed, + stats.Flushed, + ) + } + }() + + if err := <-errc; err != nil { + log.Fatal(err) + } +} -- cgit v1.2.3-1-g7c22