summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go')
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go173
1 files changed, 173 insertions, 0 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go
new file mode 100644
index 000000000..5a7909095
--- /dev/null
+++ b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go
@@ -0,0 +1,173 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See http://olivere.mit-license.org/license.txt for details.
+
+// BulkInsert illustrates how to bulk insert documents into Elasticsearch.
+//
+// It uses two goroutines to do so. The first creates a simple document
+// and sends it to the second via a channel. The second goroutine collects
+// those documents, creates a bulk request that is added to a Bulk service
+// and committed to Elasticsearch after reaching a number of documents.
+// The number of documents after which a commit happens can be specified
+// via the "bulk-size" flag.
+//
+// See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html
+// for details on the Bulk API in Elasticsearch.
+//
+// Example
+//
+// Bulk index 100.000 documents into the index "warehouse", type "product",
+// committing every set of 1.000 documents.
+//
+// bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000
+//
+package main
+
+import (
+ "context"
+ "encoding/base64"
+ "errors"
+ "flag"
+ "fmt"
+ "log"
+ "math/rand"
+ "sync/atomic"
+ "time"
+
+ "golang.org/x/sync/errgroup"
+ "gopkg.in/olivere/elastic.v5"
+)
+
+func main() {
+ var (
+ url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
+ index = flag.String("index", "", "Elasticsearch index name")
+ typ = flag.String("type", "", "Elasticsearch type name")
+ sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
+ n = flag.Int("n", 0, "Number of documents to bulk insert")
+ bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing")
+ )
+ flag.Parse()
+ log.SetFlags(0)
+ rand.Seed(time.Now().UnixNano())
+
+ if *url == "" {
+ log.Fatal("missing url parameter")
+ }
+ if *index == "" {
+ log.Fatal("missing index parameter")
+ }
+ if *typ == "" {
+ log.Fatal("missing type parameter")
+ }
+ if *n <= 0 {
+ log.Fatal("n must be a positive number")
+ }
+ if *bulkSize <= 0 {
+ log.Fatal("bulk-size must be a positive number")
+ }
+
+ // Create an Elasticsearch client
+ client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Setup a group of goroutines from the excellent errgroup package
+ g, ctx := errgroup.WithContext(context.TODO())
+
+ // The first goroutine will emit documents and send it to the second goroutine
+ // via the docsc channel.
+ // The second Goroutine will simply bulk insert the documents.
+ type doc struct {
+ ID string `json:"id"`
+ Timestamp time.Time `json:"@timestamp"`
+ }
+ docsc := make(chan doc)
+
+ begin := time.Now()
+
+ // Goroutine to create documents
+ g.Go(func() error {
+ defer close(docsc)
+
+ buf := make([]byte, 32)
+ for i := 0; i < *n; i++ {
+ // Generate a random ID
+ _, err := rand.Read(buf)
+ if err != nil {
+ return err
+ }
+ id := base64.URLEncoding.EncodeToString(buf)
+
+ // Construct the document
+ d := doc{
+ ID: id,
+ Timestamp: time.Now(),
+ }
+
+ // Send over to 2nd goroutine, or cancel
+ select {
+ case docsc <- d:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ return nil
+ })
+
+ // Second goroutine will consume the documents sent from the first and bulk insert into ES
+ var total uint64
+ g.Go(func() error {
+ bulk := client.Bulk().Index(*index).Type(*typ)
+ for d := range docsc {
+ // Simple progress
+ current := atomic.AddUint64(&total, 1)
+ dur := time.Since(begin).Seconds()
+ sec := int(dur)
+ pps := int64(float64(current) / dur)
+ fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60)
+
+ // Enqueue the document
+ bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d))
+ if bulk.NumberOfActions() >= *bulkSize {
+ // Commit
+ res, err := bulk.Do(ctx)
+ if err != nil {
+ return err
+ }
+ if res.Errors {
+ // Look up the failed documents with res.Failed(), and e.g. recommit
+ return errors.New("bulk commit failed")
+ }
+ // "bulk" is reset after Do, so you can reuse it
+ }
+
+ select {
+ default:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+
+ // Commit the final batch before exiting
+ if bulk.NumberOfActions() > 0 {
+ _, err = bulk.Do(ctx)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+
+ // Wait until all goroutines are finished
+ if err := g.Wait(); err != nil {
+ log.Fatal(err)
+ }
+
+ // Final results
+ dur := time.Since(begin).Seconds()
+ sec := int(dur)
+ pps := int64(float64(total) / dur)
+ fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60)
+}