summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/olivere/elastic/recipes
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/olivere/elastic/recipes')
-rw-r--r--vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go173
-rw-r--r--vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go149
-rw-r--r--vendor/github.com/olivere/elastic/recipes/connect/connect.go43
-rw-r--r--vendor/github.com/olivere/elastic/recipes/sliced_scroll/sliced_scroll.go161
4 files changed, 0 insertions, 526 deletions
diff --git a/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go b/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go
deleted file mode 100644
index 5a8ab39d0..000000000
--- a/vendor/github.com/olivere/elastic/recipes/bulk_insert/bulk_insert.go
+++ /dev/null
@@ -1,173 +0,0 @@
-// Copyright 2012-present Oliver Eilhard. All rights reserved.
-// Use of this source code is governed by a MIT-license.
-// See http://olivere.mit-license.org/license.txt for details.
-
-// BulkInsert illustrates how to bulk insert documents into Elasticsearch.
-//
-// It uses two goroutines to do so. The first creates a simple document
-// and sends it to the second via a channel. The second goroutine collects
-// those documents, creates a bulk request that is added to a Bulk service
-// and committed to Elasticsearch after reaching a number of documents.
-// The number of documents after which a commit happens can be specified
-// via the "bulk-size" flag.
-//
-// See https://www.elastic.co/guide/en/elasticsearch/reference/6.0/docs-bulk.html
-// for details on the Bulk API in Elasticsearch.
-//
-// Example
-//
-// Bulk index 100.000 documents into the index "warehouse", type "product",
-// committing every set of 1.000 documents.
-//
-// bulk_insert -index=warehouse -type=product -n=100000 -bulk-size=1000
-//
-package main
-
-import (
- "context"
- "encoding/base64"
- "errors"
- "flag"
- "fmt"
- "log"
- "math/rand"
- "sync/atomic"
- "time"
-
- "golang.org/x/sync/errgroup"
- "github.com/olivere/elastic"
-)
-
-func main() {
- var (
- url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
- index = flag.String("index", "", "Elasticsearch index name")
- typ = flag.String("type", "", "Elasticsearch type name")
- sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
- n = flag.Int("n", 0, "Number of documents to bulk insert")
- bulkSize = flag.Int("bulk-size", 0, "Number of documents to collect before committing")
- )
- flag.Parse()
- log.SetFlags(0)
- rand.Seed(time.Now().UnixNano())
-
- if *url == "" {
- log.Fatal("missing url parameter")
- }
- if *index == "" {
- log.Fatal("missing index parameter")
- }
- if *typ == "" {
- log.Fatal("missing type parameter")
- }
- if *n <= 0 {
- log.Fatal("n must be a positive number")
- }
- if *bulkSize <= 0 {
- log.Fatal("bulk-size must be a positive number")
- }
-
- // Create an Elasticsearch client
- client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
- if err != nil {
- log.Fatal(err)
- }
-
- // Setup a group of goroutines from the excellent errgroup package
- g, ctx := errgroup.WithContext(context.TODO())
-
- // The first goroutine will emit documents and send it to the second goroutine
- // via the docsc channel.
- // The second Goroutine will simply bulk insert the documents.
- type doc struct {
- ID string `json:"id"`
- Timestamp time.Time `json:"@timestamp"`
- }
- docsc := make(chan doc)
-
- begin := time.Now()
-
- // Goroutine to create documents
- g.Go(func() error {
- defer close(docsc)
-
- buf := make([]byte, 32)
- for i := 0; i < *n; i++ {
- // Generate a random ID
- _, err := rand.Read(buf)
- if err != nil {
- return err
- }
- id := base64.URLEncoding.EncodeToString(buf)
-
- // Construct the document
- d := doc{
- ID: id,
- Timestamp: time.Now(),
- }
-
- // Send over to 2nd goroutine, or cancel
- select {
- case docsc <- d:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- return nil
- })
-
- // Second goroutine will consume the documents sent from the first and bulk insert into ES
- var total uint64
- g.Go(func() error {
- bulk := client.Bulk().Index(*index).Type(*typ)
- for d := range docsc {
- // Simple progress
- current := atomic.AddUint64(&total, 1)
- dur := time.Since(begin).Seconds()
- sec := int(dur)
- pps := int64(float64(current) / dur)
- fmt.Printf("%10d | %6d req/s | %02d:%02d\r", current, pps, sec/60, sec%60)
-
- // Enqueue the document
- bulk.Add(elastic.NewBulkIndexRequest().Id(d.ID).Doc(d))
- if bulk.NumberOfActions() >= *bulkSize {
- // Commit
- res, err := bulk.Do(ctx)
- if err != nil {
- return err
- }
- if res.Errors {
- // Look up the failed documents with res.Failed(), and e.g. recommit
- return errors.New("bulk commit failed")
- }
- // "bulk" is reset after Do, so you can reuse it
- }
-
- select {
- default:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-
- // Commit the final batch before exiting
- if bulk.NumberOfActions() > 0 {
- _, err = bulk.Do(ctx)
- if err != nil {
- return err
- }
- }
- return nil
- })
-
- // Wait until all goroutines are finished
- if err := g.Wait(); err != nil {
- log.Fatal(err)
- }
-
- // Final results
- dur := time.Since(begin).Seconds()
- sec := int(dur)
- pps := int64(float64(total) / dur)
- fmt.Printf("%10d | %6d req/s | %02d:%02d\n", total, pps, sec/60, sec%60)
-}
diff --git a/vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go b/vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go
deleted file mode 100644
index f13243297..000000000
--- a/vendor/github.com/olivere/elastic/recipes/bulk_processor/main.go
+++ /dev/null
@@ -1,149 +0,0 @@
-// Copyright 2012-present Oliver Eilhard. All rights reserved.
-// Use of this source code is governed by a MIT-license.
-// See http://olivere.mit-license.org/license.txt for details.
-
-// BulkProcessor runs a bulk processing job that fills an index
-// given certain criteria like flush interval etc.
-//
-// Example
-//
-// bulk_processor -url=http://127.0.0.1:9200/bulk-processor-test?sniff=false -n=100000 -flush-interval=1s
-//
-package main
-
-import (
- "context"
- "flag"
- "fmt"
- "log"
- "math/rand"
- "os"
- "os/signal"
- "sync/atomic"
- "syscall"
- "time"
-
- "github.com/google/uuid"
-
- "github.com/olivere/elastic"
- "github.com/olivere/elastic/config"
-)
-
-func main() {
- var (
- url = flag.String("url", "http://localhost:9200/bulk-processor-test", "Elasticsearch URL")
- numWorkers = flag.Int("num-workers", 4, "Number of workers")
- n = flag.Int64("n", -1, "Number of documents to process (-1 for unlimited)")
- flushInterval = flag.Duration("flush-interval", 1*time.Second, "Flush interval")
- bulkActions = flag.Int("bulk-actions", 0, "Number of bulk actions before committing")
- bulkSize = flag.Int("bulk-size", 0, "Size of bulk requests before committing")
- )
- flag.Parse()
- log.SetFlags(0)
-
- rand.Seed(time.Now().UnixNano())
-
- // Parse configuration from URL
- cfg, err := config.Parse(*url)
- if err != nil {
- log.Fatal(err)
- }
-
- // Create an Elasticsearch client from the parsed config
- client, err := elastic.NewClientFromConfig(cfg)
- if err != nil {
- log.Fatal(err)
- }
-
- // Drop old index
- exists, err := client.IndexExists(cfg.Index).Do(context.Background())
- if err != nil {
- log.Fatal(err)
- }
- if exists {
- _, err = client.DeleteIndex(cfg.Index).Do(context.Background())
- if err != nil {
- log.Fatal(err)
- }
- }
-
- // Create processor
- bulkp := elastic.NewBulkProcessorService(client).
- Name("bulk-test-processor").
- Stats(true).
- Backoff(elastic.StopBackoff{}).
- FlushInterval(*flushInterval).
- Workers(*numWorkers)
- if *bulkActions > 0 {
- bulkp = bulkp.BulkActions(*bulkActions)
- }
- if *bulkSize > 0 {
- bulkp = bulkp.BulkSize(*bulkSize)
- }
- p, err := bulkp.Do(context.Background())
- if err != nil {
- log.Fatal(err)
- }
-
- var created int64
- errc := make(chan error, 1)
- go func() {
- c := make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
- <-c
- errc <- nil
- }()
-
- go func() {
- defer func() {
- if err := p.Close(); err != nil {
- errc <- err
- }
- }()
-
- type Doc struct {
- Timestamp time.Time `json:"@timestamp"`
- }
-
- for {
- current := atomic.AddInt64(&created, 1)
- if *n > 0 && current >= *n {
- errc <- nil
- return
- }
- r := elastic.NewBulkIndexRequest().
- Index(cfg.Index).
- Type("doc").
- Id(uuid.New().String()).
- Doc(Doc{Timestamp: time.Now()})
- p.Add(r)
-
- time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
- }
- }()
-
- go func() {
- t := time.NewTicker(1 * time.Second)
- defer t.Stop()
- for range t.C {
- stats := p.Stats()
- written := atomic.LoadInt64(&created)
- var queued int64
- for _, w := range stats.Workers {
- queued += w.Queued
- }
- fmt.Printf("Queued=%5d Written=%8d Succeeded=%8d Failed=%8d Comitted=%6d Flushed=%6d\n",
- queued,
- written,
- stats.Succeeded,
- stats.Failed,
- stats.Committed,
- stats.Flushed,
- )
- }
- }()
-
- if err := <-errc; err != nil {
- log.Fatal(err)
- }
-}
diff --git a/vendor/github.com/olivere/elastic/recipes/connect/connect.go b/vendor/github.com/olivere/elastic/recipes/connect/connect.go
deleted file mode 100644
index baff6c114..000000000
--- a/vendor/github.com/olivere/elastic/recipes/connect/connect.go
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright 2012-present Oliver Eilhard. All rights reserved.
-// Use of this source code is governed by a MIT-license.
-// See http://olivere.mit-license.org/license.txt for details.
-
-// Connect simply connects to Elasticsearch.
-//
-// Example
-//
-//
-// connect -url=http://127.0.0.1:9200 -sniff=false
-//
-package main
-
-import (
- "flag"
- "fmt"
- "log"
-
- "github.com/olivere/elastic"
-)
-
-func main() {
- var (
- url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
- sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
- )
- flag.Parse()
- log.SetFlags(0)
-
- if *url == "" {
- *url = "http://127.0.0.1:9200"
- }
-
- // Create an Elasticsearch client
- client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
- if err != nil {
- log.Fatal(err)
- }
- _ = client
-
- // Just a status message
- fmt.Println("Connection succeeded")
-}
diff --git a/vendor/github.com/olivere/elastic/recipes/sliced_scroll/sliced_scroll.go b/vendor/github.com/olivere/elastic/recipes/sliced_scroll/sliced_scroll.go
deleted file mode 100644
index d753a61cb..000000000
--- a/vendor/github.com/olivere/elastic/recipes/sliced_scroll/sliced_scroll.go
+++ /dev/null
@@ -1,161 +0,0 @@
-// Copyright 2012-present Oliver Eilhard. All rights reserved.
-// Use of this source code is governed by a MIT-license.
-// See http://olivere.mit-license.org/license.txt for details.
-
-// SlicedScroll illustrates scrolling through a set of documents
-// in parallel. It uses the sliced scrolling feature introduced
-// in Elasticsearch 5.0 to create a number of Goroutines, each
-// scrolling through a slice of the total results. A second goroutine
-// receives the hits from the set of goroutines scrolling through
-// the slices and simply counts the total number and the number of
-// documents received per slice.
-//
-// The speedup of sliced scrolling can be significant but is very
-// dependent on the specific use case.
-//
-// See https://www.elastic.co/guide/en/elasticsearch/reference/6.0/search-request-scroll.html#sliced-scroll
-// for details on sliced scrolling in Elasticsearch.
-//
-// Example
-//
-// Scroll with 4 parallel slices through an index called "products".
-// Use "_uid" as the default field:
-//
-// sliced_scroll -index=products -n=4
-//
-package main
-
-import (
- "context"
- "flag"
- "fmt"
- "io"
- "log"
- "sync"
- "sync/atomic"
- "time"
-
- "golang.org/x/sync/errgroup"
- "github.com/olivere/elastic"
-)
-
-func main() {
- var (
- url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
- index = flag.String("index", "", "Elasticsearch index name")
- typ = flag.String("type", "", "Elasticsearch type name")
- field = flag.String("field", "", "Slice field (must be numeric)")
- numSlices = flag.Int("n", 2, "Number of slices to use in parallel")
- sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
- )
- flag.Parse()
- log.SetFlags(0)
-
- if *url == "" {
- log.Fatal("missing url parameter")
- }
- if *index == "" {
- log.Fatal("missing index parameter")
- }
- if *numSlices <= 0 {
- log.Fatal("n must be greater than zero")
- }
-
- // Create an Elasticsearch client
- client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
- if err != nil {
- log.Fatal(err)
- }
-
- // Setup a group of goroutines from the excellent errgroup package
- g, ctx := errgroup.WithContext(context.TODO())
-
- // Hits channel will be sent to from the first set of goroutines and consumed by the second
- type hit struct {
- Slice int
- Hit elastic.SearchHit
- }
- hitsc := make(chan hit)
-
- begin := time.Now()
-
- // Start a number of goroutines to parallelize scrolling
- var wg sync.WaitGroup
- for i := 0; i < *numSlices; i++ {
- wg.Add(1)
-
- slice := i
-
- // Prepare the query
- var query elastic.Query
- if *typ == "" {
- query = elastic.NewMatchAllQuery()
- } else {
- query = elastic.NewTypeQuery(*typ)
- }
-
- // Prepare the slice
- sliceQuery := elastic.NewSliceQuery().Id(i).Max(*numSlices)
- if *field != "" {
- sliceQuery = sliceQuery.Field(*field)
- }
-
- // Start goroutine for this sliced scroll
- g.Go(func() error {
- defer wg.Done()
- svc := client.Scroll(*index).Query(query).Slice(sliceQuery)
- for {
- res, err := svc.Do(ctx)
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
- for _, searchHit := range res.Hits.Hits {
- // Pass the hit to the hits channel, which will be consumed below
- select {
- case hitsc <- hit{Slice: slice, Hit: *searchHit}:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- }
- return nil
- })
- }
- go func() {
- // Wait until all scrolling is done
- wg.Wait()
- close(hitsc)
- }()
-
- // Second goroutine will consume the hits sent from the workers in first set of goroutines
- var total uint64
- totals := make([]uint64, *numSlices)
- g.Go(func() error {
- for hit := range hitsc {
- // We simply count the hits here.
- atomic.AddUint64(&totals[hit.Slice], 1)
- current := atomic.AddUint64(&total, 1)
- sec := int(time.Since(begin).Seconds())
- fmt.Printf("%8d | %02d:%02d\r", current, sec/60, sec%60)
- select {
- default:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- return nil
- })
-
- // Wait until all goroutines are finished
- if err := g.Wait(); err != nil {
- log.Fatal(err)
- }
-
- fmt.Printf("Scrolled through a total of %d documents in %v\n", total, time.Since(begin))
- for i := 0; i < *numSlices; i++ {
- fmt.Printf("Slice %2d received %d documents\n", i, totals[i])
- }
-}