path: root/vendor/
diff options
authorGeorge Goldberg <>2017-05-09 14:13:24 +0100
committerJoram Wilander <>2017-05-09 08:13:24 -0500
commit622998add12734a6c2b5d79918338a4d6dca7ce6 (patch)
tree198d507eb04c5684d5b191b17ed957f02f2df6ed /vendor/
parentb25021b9129820147bf596b834d438ef218acf28 (diff)
PLT-6398: Add dependency on go elastic search library. (#6340)
Diffstat (limited to 'vendor/')
3 files changed, 377 insertions, 0 deletions
diff --git a/vendor/ b/vendor/
new file mode 100644
index 000000000..5a7909095
--- /dev/null
+++ b/vendor/
@@ -0,0 +1,173 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See for details.
+// BulkInsert illustrates how to bulk insert documents into Elasticsearch.
+// It uses two goroutines to do so. The first creates a simple document
+// and sends it to the second via a channel. The second goroutine collects
+// those documents, creates a bulk request that is added to a Bulk service
+// and committed to Elasticsearch after reaching a number of documents.
+// The number of documents after which a commit happens can be specified
+// via the "bulk-size" flag.
+// See
+// for details on the Bulk API in Elasticsearch.
+// Example
+// Bulk index 100.000 documents into the index "warehouse", type "product",
+// committing every set of 1.000 documents.
+// bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000
+package main
+import (
+ "context"
+ "encoding/base64"
+ "errors"
+ "flag"
+ "fmt"
+ "log"
+ "math/rand"
+ "sync/atomic"
+ "time"
+ ""
+ ""
+func main() {
+ var (
+ url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
+ index = flag.String("index", "", "Elasticsearch index name")
+ typ = flag.String("type", "", "Elasticsearch type name")
+ sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
+ n = flag.Int("n", 0, "Number of documents to bulk insert")
+ bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing")
+ )
+ flag.Parse()
+ log.SetFlags(0)
+ rand.Seed(time.Now().UnixNano())
+ if *url == "" {
+ log.Fatal("missing url parameter")
+ }
+ if *index == "" {
+ log.Fatal("missing index parameter")
+ }
+ if *typ == "" {
+ log.Fatal("missing type parameter")
+ }
+ if *n <= 0 {
+ log.Fatal("n must be a positive number")
+ }
+ if *bulkSize <= 0 {
+ log.Fatal("bulk-size must be a positive number")
+ }
+ // Create an Elasticsearch client
+ client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
+ if err != nil {
+ log.Fatal(err)
+ }
+ // Setup a group of goroutines from the excellent errgroup package
+ g, ctx := errgroup.WithContext(context.TODO())
+ // The first goroutine will emit documents and send it to the second goroutine
+ // via the docsc channel.
+ // The second Goroutine will simply bulk insert the documents.
+ type doc struct {
+ ID string `json:"id"`
+ Timestamp time.Time `json:"@timestamp"`
+ }
+ docsc := make(chan doc)
+ begin := time.Now()
+ // Goroutine to create documents
+ g.Go(func() error {
+ defer close(docsc)
+ buf := make([]byte, 32)
+ for i := 0; i < *n; i++ {
+ // Generate a random ID
+ _, err := rand.Read(buf)
+ if err != nil {
+ return err
+ }
+ id := base64.URLEncoding.EncodeToString(buf)
+ // Construct the document
+ d := doc{
+ ID: id,
+ Timestamp: time.Now(),
+ }
+ // Send over to 2nd goroutine, or cancel
+ select {
+ case docsc <- d:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ return nil
+ })
+ // Second goroutine will consume the documents sent from the first and bulk insert into ES
+ var total uint64
+ g.Go(func() error {
+ bulk := client.Bulk().Index(*index).Type(*typ)
+ for d := range docsc {
+ // Simple progress
+ current := atomic.AddUint64(&total, 1)
+ dur := time.Since(begin).Seconds()
+ sec := int(dur)
+ pps := int64(float64(current) / dur)
+ fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60)
+ // Enqueue the document
+ bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d))
+ if bulk.NumberOfActions() >= *bulkSize {
+ // Commit
+ res, err := bulk.Do(ctx)
+ if err != nil {
+ return err
+ }
+ if res.Errors {
+ // Look up the failed documents with res.Failed(), and e.g. recommit
+ return errors.New("bulk commit failed")
+ }
+ // "bulk" is reset after Do, so you can reuse it
+ }
+ select {
+ default:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ // Commit the final batch before exiting
+ if bulk.NumberOfActions() > 0 {
+ _, err = bulk.Do(ctx)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ // Wait until all goroutines are finished
+ if err := g.Wait(); err != nil {
+ log.Fatal(err)
+ }
+ // Final results
+ dur := time.Since(begin).Seconds()
+ sec := int(dur)
+ pps := int64(float64(total) / dur)
+ fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60)
diff --git a/vendor/ b/vendor/
new file mode 100644
index 000000000..156658d36
--- /dev/null
+++ b/vendor/
@@ -0,0 +1,43 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See for details.
+// Connect simply connects to Elasticsearch.
+// Example
+// connect -url= -sniff=false
+package main
+import (
+ "flag"
+ "fmt"
+ "log"
+ ""
+func main() {
+ var (
+ url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
+ sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
+ )
+ flag.Parse()
+ log.SetFlags(0)
+ if *url == "" {
+ *url = ""
+ }
+ // Create an Elasticsearch client
+ client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
+ if err != nil {
+ log.Fatal(err)
+ }
+ _ = client
+ // Just a status message
+ fmt.Println("Connection succeeded")
diff --git a/vendor/ b/vendor/
new file mode 100644
index 000000000..e59ca562d
--- /dev/null
+++ b/vendor/
@@ -0,0 +1,161 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See for details.
+// SlicedScroll illustrates scrolling through a set of documents
+// in parallel. It uses the sliced scrolling feature introduced
+// in Elasticsearch 5.0 to create a number of Goroutines, each
+// scrolling through a slice of the total results. A second goroutine
+// receives the hits from the set of goroutines scrolling through
+// the slices and simply counts the total number and the number of
+// documents received per slice.
+// The speedup of sliced scrolling can be significant but is very
+// dependent on the specific use case.
+// See
+// for details on sliced scrolling in Elasticsearch.
+// Example
+// Scroll with 4 parallel slices through an index called "products".
+// Use "_uid" as the default field:
+// sliced_scroll -index=products -n=4
+package main
+import (
+ "context"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "sync"
+ "sync/atomic"
+ "time"
+ ""
+ ""
+func main() {
+ var (
+ url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
+ index = flag.String("index", "", "Elasticsearch index name")
+ typ = flag.String("type", "", "Elasticsearch type name")
+ field = flag.String("field", "", "Slice field (must be numeric)")
+ numSlices = flag.Int("n", 2, "Number of slices to use in parallel")
+ sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
+ )
+ flag.Parse()
+ log.SetFlags(0)
+ if *url == "" {
+ log.Fatal("missing url parameter")
+ }
+ if *index == "" {
+ log.Fatal("missing index parameter")
+ }
+ if *numSlices <= 0 {
+ log.Fatal("n must be greater than zero")
+ }
+ // Create an Elasticsearch client
+ client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
+ if err != nil {
+ log.Fatal(err)
+ }
+ // Setup a group of goroutines from the excellent errgroup package
+ g, ctx := errgroup.WithContext(context.TODO())
+ // Hits channel will be sent to from the first set of goroutines and consumed by the second
+ type hit struct {
+ Slice int
+ Hit elastic.SearchHit
+ }
+ hitsc := make(chan hit)
+ begin := time.Now()
+ // Start a number of goroutines to parallelize scrolling
+ var wg sync.WaitGroup
+ for i := 0; i < *numSlices; i++ {
+ wg.Add(1)
+ slice := i
+ // Prepare the query
+ var query elastic.Query
+ if *typ == "" {
+ query = elastic.NewMatchAllQuery()
+ } else {
+ query = elastic.NewTypeQuery(*typ)
+ }
+ // Prepare the slice
+ sliceQuery := elastic.NewSliceQuery().Id(i).Max(*numSlices)
+ if *field != "" {
+ sliceQuery = sliceQuery.Field(*field)
+ }
+ // Start goroutine for this sliced scroll
+ g.Go(func() error {
+ defer wg.Done()
+ svc := client.Scroll(*index).Query(query).Slice(sliceQuery)
+ for {
+ res, err := svc.Do(ctx)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ for _, searchHit := range res.Hits.Hits {
+ // Pass the hit to the hits channel, which will be consumed below
+ select {
+ case hitsc <- hit{Slice: slice, Hit: *searchHit}:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ }
+ return nil
+ })
+ }
+ go func() {
+ // Wait until all scrolling is done
+ wg.Wait()
+ close(hitsc)
+ }()
+ // Second goroutine will consume the hits sent from the workers in first set of goroutines
+ var total uint64
+ totals := make([]uint64, *numSlices)
+ g.Go(func() error {
+ for hit := range hitsc {
+ // We simply count the hits here.
+ atomic.AddUint64(&totals[hit.Slice], 1)
+ current := atomic.AddUint64(&total, 1)
+ sec := int(time.Since(begin).Seconds())
+ fmt.Printf("%8d | %02d:%02d\r", current, sec/60, sec%60)
+ select {
+ default:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ return nil
+ })
+ // Wait until all goroutines are finished
+ if err := g.Wait(); err != nil {
+ log.Fatal(err)
+ }
+ fmt.Printf("Scrolled through a total of %d documents in %v\n", total, time.Since(begin))
+ for i := 0; i < *numSlices; i++ {
+ fmt.Printf("Slice %2d received %d documents\n", i, totals[i])
+ }