From 622998add12734a6c2b5d79918338a4d6dca7ce6 Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Tue, 9 May 2017 14:13:24 +0100 Subject: PLT-6398: Add dependency on go elastic search library. (#6340) --- .../elastic.v5/recipes/bulk_insert/bulk_insert.go | 173 +++++++++++++++++++++ .../olivere/elastic.v5/recipes/connect/connect.go | 43 +++++ .../recipes/sliced_scroll/sliced_scroll.go | 161 +++++++++++++++++++ 3 files changed, 377 insertions(+) create mode 100644 vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go create mode 100644 vendor/gopkg.in/olivere/elastic.v5/recipes/connect/connect.go create mode 100644 vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go (limited to 'vendor/gopkg.in/olivere/elastic.v5/recipes') diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go new file mode 100644 index 000000000..5a7909095 --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go @@ -0,0 +1,173 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// BulkInsert illustrates how to bulk insert documents into Elasticsearch. +// +// It uses two goroutines to do so. The first creates a simple document +// and sends it to the second via a channel. The second goroutine collects +// those documents, creates a bulk request that is added to a Bulk service +// and committed to Elasticsearch after reaching a number of documents. +// The number of documents after which a commit happens can be specified +// via the "bulk-size" flag. +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html +// for details on the Bulk API in Elasticsearch. +// +// Example +// +// Bulk index 100.000 documents into the index "warehouse", type "product", +// committing every set of 1.000 documents. +// +// bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000 +// +package main + +import ( + "context" + "encoding/base64" + "errors" + "flag" + "fmt" + "log" + "math/rand" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" + "gopkg.in/olivere/elastic.v5" +) + +func main() { + var ( + url = flag.String("url", "http://localhost:9200", "Elasticsearch URL") + index = flag.String("index", "", "Elasticsearch index name") + typ = flag.String("type", "", "Elasticsearch type name") + sniff = flag.Bool("sniff", true, "Enable or disable sniffing") + n = flag.Int("n", 0, "Number of documents to bulk insert") + bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing") + ) + flag.Parse() + log.SetFlags(0) + rand.Seed(time.Now().UnixNano()) + + if *url == "" { + log.Fatal("missing url parameter") + } + if *index == "" { + log.Fatal("missing index parameter") + } + if *typ == "" { + log.Fatal("missing type parameter") + } + if *n <= 0 { + log.Fatal("n must be a positive number") + } + if *bulkSize <= 0 { + log.Fatal("bulk-size must be a positive number") + } + + // Create an Elasticsearch client + client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff)) + if err != nil { + log.Fatal(err) + } + + // Setup a group of goroutines from the excellent errgroup package + g, ctx := errgroup.WithContext(context.TODO()) + + // The first goroutine will emit documents and send it to the second goroutine + // via the docsc channel. + // The second Goroutine will simply bulk insert the documents. + type doc struct { + ID string `json:"id"` + Timestamp time.Time `json:"@timestamp"` + } + docsc := make(chan doc) + + begin := time.Now() + + // Goroutine to create documents + g.Go(func() error { + defer close(docsc) + + buf := make([]byte, 32) + for i := 0; i < *n; i++ { + // Generate a random ID + _, err := rand.Read(buf) + if err != nil { + return err + } + id := base64.URLEncoding.EncodeToString(buf) + + // Construct the document + d := doc{ + ID: id, + Timestamp: time.Now(), + } + + // Send over to 2nd goroutine, or cancel + select { + case docsc <- d: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + // Second goroutine will consume the documents sent from the first and bulk insert into ES + var total uint64 + g.Go(func() error { + bulk := client.Bulk().Index(*index).Type(*typ) + for d := range docsc { + // Simple progress + current := atomic.AddUint64(&total, 1) + dur := time.Since(begin).Seconds() + sec := int(dur) + pps := int64(float64(current) / dur) + fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60) + + // Enqueue the document + bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d)) + if bulk.NumberOfActions() >= *bulkSize { + // Commit + res, err := bulk.Do(ctx) + if err != nil { + return err + } + if res.Errors { + // Look up the failed documents with res.Failed(), and e.g. recommit + return errors.New("bulk commit failed") + } + // "bulk" is reset after Do, so you can reuse it + } + + select { + default: + case <-ctx.Done(): + return ctx.Err() + } + } + + // Commit the final batch before exiting + if bulk.NumberOfActions() > 0 { + _, err = bulk.Do(ctx) + if err != nil { + return err + } + } + return nil + }) + + // Wait until all goroutines are finished + if err := g.Wait(); err != nil { + log.Fatal(err) + } + + // Final results + dur := time.Since(begin).Seconds() + sec := int(dur) + pps := int64(float64(total) / dur) + fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60) +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/connect/connect.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/connect/connect.go new file mode 100644 index 000000000..156658d36 --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/recipes/connect/connect.go @@ -0,0 +1,43 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// Connect simply connects to Elasticsearch. +// +// Example +// +// +// connect -url=http://127.0.0.1:9200 -sniff=false +// +package main + +import ( + "flag" + "fmt" + "log" + + "gopkg.in/olivere/elastic.v5" +) + +func main() { + var ( + url = flag.String("url", "http://localhost:9200", "Elasticsearch URL") + sniff = flag.Bool("sniff", true, "Enable or disable sniffing") + ) + flag.Parse() + log.SetFlags(0) + + if *url == "" { + *url = "http://127.0.0.1:9200" + } + + // Create an Elasticsearch client + client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff)) + if err != nil { + log.Fatal(err) + } + _ = client + + // Just a status message + fmt.Println("Connection succeeded") +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go new file mode 100644 index 000000000..e59ca562d --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go @@ -0,0 +1,161 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// SlicedScroll illustrates scrolling through a set of documents +// in parallel. It uses the sliced scrolling feature introduced +// in Elasticsearch 5.0 to create a number of Goroutines, each +// scrolling through a slice of the total results. A second goroutine +// receives the hits from the set of goroutines scrolling through +// the slices and simply counts the total number and the number of +// documents received per slice. +// +// The speedup of sliced scrolling can be significant but is very +// dependent on the specific use case. +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/search-request-scroll.html#sliced-scroll +// for details on sliced scrolling in Elasticsearch. +// +// Example +// +// Scroll with 4 parallel slices through an index called "products". +// Use "_uid" as the default field: +// +// sliced_scroll -index=products -n=4 +// +package main + +import ( + "context" + "flag" + "fmt" + "io" + "log" + "sync" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" + "gopkg.in/olivere/elastic.v5" +) + +func main() { + var ( + url = flag.String("url", "http://localhost:9200", "Elasticsearch URL") + index = flag.String("index", "", "Elasticsearch index name") + typ = flag.String("type", "", "Elasticsearch type name") + field = flag.String("field", "", "Slice field (must be numeric)") + numSlices = flag.Int("n", 2, "Number of slices to use in parallel") + sniff = flag.Bool("sniff", true, "Enable or disable sniffing") + ) + flag.Parse() + log.SetFlags(0) + + if *url == "" { + log.Fatal("missing url parameter") + } + if *index == "" { + log.Fatal("missing index parameter") + } + if *numSlices <= 0 { + log.Fatal("n must be greater than zero") + } + + // Create an Elasticsearch client + client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff)) + if err != nil { + log.Fatal(err) + } + + // Setup a group of goroutines from the excellent errgroup package + g, ctx := errgroup.WithContext(context.TODO()) + + // Hits channel will be sent to from the first set of goroutines and consumed by the second + type hit struct { + Slice int + Hit elastic.SearchHit + } + hitsc := make(chan hit) + + begin := time.Now() + + // Start a number of goroutines to parallelize scrolling + var wg sync.WaitGroup + for i := 0; i < *numSlices; i++ { + wg.Add(1) + + slice := i + + // Prepare the query + var query elastic.Query + if *typ == "" { + query = elastic.NewMatchAllQuery() + } else { + query = elastic.NewTypeQuery(*typ) + } + + // Prepare the slice + sliceQuery := elastic.NewSliceQuery().Id(i).Max(*numSlices) + if *field != "" { + sliceQuery = sliceQuery.Field(*field) + } + + // Start goroutine for this sliced scroll + g.Go(func() error { + defer wg.Done() + svc := client.Scroll(*index).Query(query).Slice(sliceQuery) + for { + res, err := svc.Do(ctx) + if err == io.EOF { + break + } + if err != nil { + return err + } + for _, searchHit := range res.Hits.Hits { + // Pass the hit to the hits channel, which will be consumed below + select { + case hitsc <- hit{Slice: slice, Hit: *searchHit}: + case <-ctx.Done(): + return ctx.Err() + } + } + } + return nil + }) + } + go func() { + // Wait until all scrolling is done + wg.Wait() + close(hitsc) + }() + + // Second goroutine will consume the hits sent from the workers in first set of goroutines + var total uint64 + totals := make([]uint64, *numSlices) + g.Go(func() error { + for hit := range hitsc { + // We simply count the hits here. + atomic.AddUint64(&totals[hit.Slice], 1) + current := atomic.AddUint64(&total, 1) + sec := int(time.Since(begin).Seconds()) + fmt.Printf("%8d | %02d:%02d\r", current, sec/60, sec%60) + select { + default: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + // Wait until all goroutines are finished + if err := g.Wait(); err != nil { + log.Fatal(err) + } + + fmt.Printf("Scrolled through a total of %d documents in %v\n", total, time.Since(begin)) + for i := 0; i < *numSlices; i++ { + fmt.Printf("Slice %2d received %d documents\n", i, totals[i]) + } +} -- cgit v1.2.3-1-g7c22