From 6e2cb00008cbf09e556b00f87603797fcaa47e09 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Mon, 16 Apr 2018 05:37:14 -0700 Subject: Depenancy upgrades and movign to dep. (#8630) --- .../elastic/recipes/bulk_insert/bulk_insert.go | 173 --------------------- 1 file changed, 173 deletions(-) delete mode 100644 vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go (limited to 'vendor/github.com/olivere/elastic/recipes/bulk_insert') diff --git a/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go b/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go deleted file mode 100644 index 5a8ab39d0..000000000 --- a/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2012-present Oliver Eilhard. All rights reserved. -// Use of this source code is governed by a MIT-license. -// See http://olivere.mit-license.org/license.txt for details. - -// BulkInsert illustrates how to bulk insert documents into Elasticsearch. -// -// It uses two goroutines to do so. The first creates a simple document -// and sends it to the second via a channel. The second goroutine collects -// those documents, creates a bulk request that is added to a Bulk service -// and committed to Elasticsearch after reaching a number of documents. -// The number of documents after which a commit happens can be specified -// via the "bulk-size" flag. -// -// See https://www.elastic.co/guide/en/elasticsearch/reference/6.0/docs-bulk.html -// for details on the Bulk API in Elasticsearch. -// -// Example -// -// Bulk index 100.000 documents into the index "warehouse", type "product", -// committing every set of 1.000 documents. -// -// bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000 -// -package main - -import ( - "context" - "encoding/base64" - "errors" - "flag" - "fmt" - "log" - "math/rand" - "sync/atomic" - "time" - - "golang.org/x/sync/errgroup" - "github.com/olivere/elastic" -) - -func main() { - var ( - url = flag.String("url", "http://localhost:9200", "Elasticsearch URL") - index = flag.String("index", "", "Elasticsearch index name") - typ = flag.String("type", "", "Elasticsearch type name") - sniff = flag.Bool("sniff", true, "Enable or disable sniffing") - n = flag.Int("n", 0, "Number of documents to bulk insert") - bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing") - ) - flag.Parse() - log.SetFlags(0) - rand.Seed(time.Now().UnixNano()) - - if *url == "" { - log.Fatal("missing url parameter") - } - if *index == "" { - log.Fatal("missing index parameter") - } - if *typ == "" { - log.Fatal("missing type parameter") - } - if *n <= 0 { - log.Fatal("n must be a positive number") - } - if *bulkSize <= 0 { - log.Fatal("bulk-size must be a positive number") - } - - // Create an Elasticsearch client - client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff)) - if err != nil { - log.Fatal(err) - } - - // Setup a group of goroutines from the excellent errgroup package - g, ctx := errgroup.WithContext(context.TODO()) - - // The first goroutine will emit documents and send it to the second goroutine - // via the docsc channel. - // The second Goroutine will simply bulk insert the documents. - type doc struct { - ID string `json:"id"` - Timestamp time.Time `json:"@timestamp"` - } - docsc := make(chan doc) - - begin := time.Now() - - // Goroutine to create documents - g.Go(func() error { - defer close(docsc) - - buf := make([]byte, 32) - for i := 0; i < *n; i++ { - // Generate a random ID - _, err := rand.Read(buf) - if err != nil { - return err - } - id := base64.URLEncoding.EncodeToString(buf) - - // Construct the document - d := doc{ - ID: id, - Timestamp: time.Now(), - } - - // Send over to 2nd goroutine, or cancel - select { - case docsc <- d: - case <-ctx.Done(): - return ctx.Err() - } - } - return nil - }) - - // Second goroutine will consume the documents sent from the first and bulk insert into ES - var total uint64 - g.Go(func() error { - bulk := client.Bulk().Index(*index).Type(*typ) - for d := range docsc { - // Simple progress - current := atomic.AddUint64(&total, 1) - dur := time.Since(begin).Seconds() - sec := int(dur) - pps := int64(float64(current) / dur) - fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60) - - // Enqueue the document - bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d)) - if bulk.NumberOfActions() >= *bulkSize { - // Commit - res, err := bulk.Do(ctx) - if err != nil { - return err - } - if res.Errors { - // Look up the failed documents with res.Failed(), and e.g. recommit - return errors.New("bulk commit failed") - } - // "bulk" is reset after Do, so you can reuse it - } - - select { - default: - case <-ctx.Done(): - return ctx.Err() - } - } - - // Commit the final batch before exiting - if bulk.NumberOfActions() > 0 { - _, err = bulk.Do(ctx) - if err != nil { - return err - } - } - return nil - }) - - // Wait until all goroutines are finished - if err := g.Wait(); err != nil { - log.Fatal(err) - } - - // Final results - dur := time.Since(begin).Seconds() - sec := int(dur) - pps := int64(float64(total) / dur) - fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60) -} -- cgit v1.2.3-1-g7c22