summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go')
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go173
1 files changed, 0 insertions, 173 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go
deleted file mode 100644
index 5a8ab39d0..000000000
--- a/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_insert/bulk_insert.go
+++ /dev/null
@@ -1,173 +0,0 @@
-// Copyright 2012-present Oliver Eilhard. All rights reserved.
-// Use of this source code is governed by a MIT-license.
-// See http://olivere.mit-license.org/license.txt for details.
-
-// BulkInsert illustrates how to bulk insert documents into Elasticsearch.
-//
-// It uses two goroutines to do so. The first creates a simple document
-// and sends it to the second via a channel. The second goroutine collects
-// those documents, creates a bulk request that is added to a Bulk service
-// and committed to Elasticsearch after reaching a number of documents.
-// The number of documents after which a commit happens can be specified
-// via the "bulk-size" flag.
-//
-// See https://www.elastic.co/guide/en/elasticsearch/reference/6.0/docs-bulk.html
-// for details on the Bulk API in Elasticsearch.
-//
-// Example
-//
-// Bulk index 100.000 documents into the index "warehouse", type "product",
-// committing every set of 1.000 documents.
-//
-// bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000
-//
-package main
-
-import (
- "context"
- "encoding/base64"
- "errors"
- "flag"
- "fmt"
- "log"
- "math/rand"
- "sync/atomic"
- "time"
-
- "golang.org/x/sync/errgroup"
- "github.com/olivere/elastic"
-)
-
-func main() {
- var (
- url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
- index = flag.String("index", "", "Elasticsearch index name")
- typ = flag.String("type", "", "Elasticsearch type name")
- sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
- n = flag.Int("n", 0, "Number of documents to bulk insert")
- bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing")
- )
- flag.Parse()
- log.SetFlags(0)
- rand.Seed(time.Now().UnixNano())
-
- if *url == "" {
- log.Fatal("missing url parameter")
- }
- if *index == "" {
- log.Fatal("missing index parameter")
- }
- if *typ == "" {
- log.Fatal("missing type parameter")
- }
- if *n <= 0 {
- log.Fatal("n must be a positive number")
- }
- if *bulkSize <= 0 {
- log.Fatal("bulk-size must be a positive number")
- }
-
- // Create an Elasticsearch client
- client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
- if err != nil {
- log.Fatal(err)
- }
-
- // Setup a group of goroutines from the excellent errgroup package
- g, ctx := errgroup.WithContext(context.TODO())
-
- // The first goroutine will emit documents and send it to the second goroutine
- // via the docsc channel.
- // The second Goroutine will simply bulk insert the documents.
- type doc struct {
- ID string `json:"id"`
- Timestamp time.Time `json:"@timestamp"`
- }
- docsc := make(chan doc)
-
- begin := time.Now()
-
- // Goroutine to create documents
- g.Go(func() error {
- defer close(docsc)
-
- buf := make([]byte, 32)
- for i := 0; i < *n; i++ {
- // Generate a random ID
- _, err := rand.Read(buf)
- if err != nil {
- return err
- }
- id := base64.URLEncoding.EncodeToString(buf)
-
- // Construct the document
- d := doc{
- ID: id,
- Timestamp: time.Now(),
- }
-
- // Send over to 2nd goroutine, or cancel
- select {
- case docsc <- d:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- return nil
- })
-
- // Second goroutine will consume the documents sent from the first and bulk insert into ES
- var total uint64
- g.Go(func() error {
- bulk := client.Bulk().Index(*index).Type(*typ)
- for d := range docsc {
- // Simple progress
- current := atomic.AddUint64(&total, 1)
- dur := time.Since(begin).Seconds()
- sec := int(dur)
- pps := int64(float64(current) / dur)
- fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60)
-
- // Enqueue the document
- bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d))
- if bulk.NumberOfActions() >= *bulkSize {
- // Commit
- res, err := bulk.Do(ctx)
- if err != nil {
- return err
- }
- if res.Errors {
- // Look up the failed documents with res.Failed(), and e.g. recommit
- return errors.New("bulk commit failed")
- }
- // "bulk" is reset after Do, so you can reuse it
- }
-
- select {
- default:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-
- // Commit the final batch before exiting
- if bulk.NumberOfActions() > 0 {
- _, err = bulk.Do(ctx)
- if err != nil {
- return err
- }
- }
- return nil
- })
-
- // Wait until all goroutines are finished
- if err := g.Wait(); err != nil {
- log.Fatal(err)
- }
-
- // Final results
- dur := time.Since(begin).Seconds()
- sec := int(dur)
- pps := int64(float64(total) / dur)
- fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60)
-}