summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go')
-rw-r--r--vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go149
1 files changed, 0 insertions, 149 deletions
diff --git a/vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go b/vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go
deleted file mode 100644
index f13243297..000000000
--- a/vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go
+++ /dev/null
@@ -1,149 +0,0 @@
-// Copyright 2012-present Oliver Eilhard. All rights reserved.
-// Use of this source code is governed by a MIT-license.
-// See http://olivere.mit-license.org/license.txt for details.
-
-// BulkProcessor runs a bulk processing job that fills an index
-// given certain criteria like flush interval etc.
-//
-// Example
-//
-// bulk_processor -url=http://127.0.0.1:9200/bulk-processor-test?sniff=false -n=100000 -flush-interval=1s
-//
-package main
-
-import (
- "context"
- "flag"
- "fmt"
- "log"
- "math/rand"
- "os"
- "os/signal"
- "sync/atomic"
- "syscall"
- "time"
-
- "github.com/google/uuid"
-
- "github.com/olivere/elastic"
- "github.com/olivere/elastic/config"
-)
-
-func main() {
- var (
- url = flag.String("url", "http://localhost:9200/bulk-processor-test", "Elasticsearch URL")
- numWorkers = flag.Int("num-workers", 4, "Number of workers")
- n = flag.Int64("n", -1, "Number of documents to process (-1 for unlimited)")
- flushInterval = flag.Duration("flush-interval", 1*time.Second, "Flush interval")
- bulkActions = flag.Int("bulk-actions", 0, "Number of bulk actions before committing")
- bulkSize = flag.Int("bulk-size", 0, "Size of bulk requests before committing")
- )
- flag.Parse()
- log.SetFlags(0)
-
- rand.Seed(time.Now().UnixNano())
-
- // Parse configuration from URL
- cfg, err := config.Parse(*url)
- if err != nil {
- log.Fatal(err)
- }
-
- // Create an Elasticsearch client from the parsed config
- client, err := elastic.NewClientFromConfig(cfg)
- if err != nil {
- log.Fatal(err)
- }
-
- // Drop old index
- exists, err := client.IndexExists(cfg.Index).Do(context.Background())
- if err != nil {
- log.Fatal(err)
- }
- if exists {
- _, err = client.DeleteIndex(cfg.Index).Do(context.Background())
- if err != nil {
- log.Fatal(err)
- }
- }
-
- // Create processor
- bulkp := elastic.NewBulkProcessorService(client).
- Name("bulk-test-processor").
- Stats(true).
- Backoff(elastic.StopBackoff{}).
- FlushInterval(*flushInterval).
- Workers(*numWorkers)
- if *bulkActions > 0 {
- bulkp = bulkp.BulkActions(*bulkActions)
- }
- if *bulkSize > 0 {
- bulkp = bulkp.BulkSize(*bulkSize)
- }
- p, err := bulkp.Do(context.Background())
- if err != nil {
- log.Fatal(err)
- }
-
- var created int64
- errc := make(chan error, 1)
- go func() {
- c := make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
- <-c
- errc <- nil
- }()
-
- go func() {
- defer func() {
- if err := p.Close(); err != nil {
- errc <- err
- }
- }()
-
- type Doc struct {
- Timestamp time.Time `json:"@timestamp"`
- }
-
- for {
- current := atomic.AddInt64(&created, 1)
- if *n > 0 && current >= *n {
- errc <- nil
- return
- }
- r := elastic.NewBulkIndexRequest().
- Index(cfg.Index).
- Type("doc").
- Id(uuid.New().String()).
- Doc(Doc{Timestamp: time.Now()})
- p.Add(r)
-
- time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
- }
- }()
-
- go func() {
- t := time.NewTicker(1 * time.Second)
- defer t.Stop()
- for range t.C {
- stats := p.Stats()
- written := atomic.LoadInt64(&created)
- var queued int64
- for _, w := range stats.Workers {
- queued += w.Queued
- }
- fmt.Printf("Queued=%5d Written=%8d Succeeded=%8d Failed=%8d Comitted=%6d Flushed=%6d\n",
- queued,
- written,
- stats.Succeeded,
- stats.Failed,
- stats.Committed,
- stats.Flushed,
- )
- }
- }()
-
- if err := <-errc; err != nil {
- log.Fatal(err)
- }
-}