diff options
author | Christopher Speller <crspeller@gmail.com> | 2018-01-29 14:17:40 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-29 14:17:40 -0800 |
commit | 961c04cae992eadb42d286d2f85f8a675bdc68c8 (patch) | |
tree | 3408f2d06f847e966c53485e2d54c692cdd037c1 /vendor/github.com/olivere/elastic/recipes/bulk_insert | |
parent | 8d66523ba7d9a77129844be476732ebfd5272d64 (diff) | |
download | chat-961c04cae992eadb42d286d2f85f8a675bdc68c8.tar.gz chat-961c04cae992eadb42d286d2f85f8a675bdc68c8.tar.bz2 chat-961c04cae992eadb42d286d2f85f8a675bdc68c8.zip |
Upgrading server dependancies (#8154)
Diffstat (limited to 'vendor/github.com/olivere/elastic/recipes/bulk_insert')
-rw-r--r-- | vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go b/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go new file mode 100644 index 000000000..5a8ab39d0 --- /dev/null +++ b/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go @@ -0,0 +1,173 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// BulkInsert illustrates how to bulk insert documents into Elasticsearch. +// +// It uses two goroutines to do so. The first creates a simple document +// and sends it to the second via a channel. The second goroutine collects +// those documents, creates a bulk request that is added to a Bulk service +// and committed to Elasticsearch after reaching a number of documents. +// The number of documents after which a commit happens can be specified +// via the "bulk-size" flag. +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/6.0/docs-bulk.html +// for details on the Bulk API in Elasticsearch. +// +// Example +// +// Bulk index 100.000 documents into the index "warehouse", type "product", +// committing every set of 1.000 documents. +// +// bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000 +// +package main + +import ( + "context" + "encoding/base64" + "errors" + "flag" + "fmt" + "log" + "math/rand" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" + "github.com/olivere/elastic" +) + +func main() { + var ( + url = flag.String("url", "http://localhost:9200", "Elasticsearch URL") + index = flag.String("index", "", "Elasticsearch index name") + typ = flag.String("type", "", "Elasticsearch type name") + sniff = flag.Bool("sniff", true, "Enable or disable sniffing") + n = flag.Int("n", 0, "Number of documents to bulk insert") + bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing") + ) + flag.Parse() + log.SetFlags(0) + rand.Seed(time.Now().UnixNano()) + + if *url == "" { + log.Fatal("missing url parameter") + } + if *index == "" { + log.Fatal("missing index parameter") + } + if *typ == "" { + log.Fatal("missing type parameter") + } + if *n <= 0 { + log.Fatal("n must be a positive number") + } + if *bulkSize <= 0 { + log.Fatal("bulk-size must be a positive number") + } + + // Create an Elasticsearch client + client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff)) + if err != nil { + log.Fatal(err) + } + + // Setup a group of goroutines from the excellent errgroup package + g, ctx := errgroup.WithContext(context.TODO()) + + // The first goroutine will emit documents and send it to the second goroutine + // via the docsc channel. + // The second Goroutine will simply bulk insert the documents. + type doc struct { + ID string `json:"id"` + Timestamp time.Time `json:"@timestamp"` + } + docsc := make(chan doc) + + begin := time.Now() + + // Goroutine to create documents + g.Go(func() error { + defer close(docsc) + + buf := make([]byte, 32) + for i := 0; i < *n; i++ { + // Generate a random ID + _, err := rand.Read(buf) + if err != nil { + return err + } + id := base64.URLEncoding.EncodeToString(buf) + + // Construct the document + d := doc{ + ID: id, + Timestamp: time.Now(), + } + + // Send over to 2nd goroutine, or cancel + select { + case docsc <- d: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + // Second goroutine will consume the documents sent from the first and bulk insert into ES + var total uint64 + g.Go(func() error { + bulk := client.Bulk().Index(*index).Type(*typ) + for d := range docsc { + // Simple progress + current := atomic.AddUint64(&total, 1) + dur := time.Since(begin).Seconds() + sec := int(dur) + pps := int64(float64(current) / dur) + fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60) + + // Enqueue the document + bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d)) + if bulk.NumberOfActions() >= *bulkSize { + // Commit + res, err := bulk.Do(ctx) + if err != nil { + return err + } + if res.Errors { + // Look up the failed documents with res.Failed(), and e.g. recommit + return errors.New("bulk commit failed") + } + // "bulk" is reset after Do, so you can reuse it + } + + select { + default: + case <-ctx.Done(): + return ctx.Err() + } + } + + // Commit the final batch before exiting + if bulk.NumberOfActions() > 0 { + _, err = bulk.Do(ctx) + if err != nil { + return err + } + } + return nil + }) + + // Wait until all goroutines are finished + if err := g.Wait(); err != nil { + log.Fatal(err) + } + + // Final results + dur := time.Since(begin).Seconds() + sec := int(dur) + pps := int64(float64(total) / dur) + fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60) +} |