summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go')
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go161
1 files changed, 0 insertions, 161 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go
deleted file mode 100644
index d753a61cb..000000000
--- a/vendor/gopkg.in/olivere/elastic.v5/recipes/sliced_scroll/sliced_scroll.go
+++ /dev/null
@@ -1,161 +0,0 @@
-// Copyright 2012-present Oliver Eilhard. All rights reserved.
-// Use of this source code is governed by a MIT-license.
-// See http://olivere.mit-license.org/license.txt for details.
-
-// SlicedScroll illustrates scrolling through a set of documents
-// in parallel. It uses the sliced scrolling feature introduced
-// in Elasticsearch 5.0 to create a number of Goroutines, each
-// scrolling through a slice of the total results. A second goroutine
-// receives the hits from the set of goroutines scrolling through
-// the slices and simply counts the total number and the number of
-// documents received per slice.
-//
-// The speedup of sliced scrolling can be significant but is very
-// dependent on the specific use case.
-//
-// See https://www.elastic.co/guide/en/elasticsearch/reference/6.0/search-request-scroll.html#sliced-scroll
-// for details on sliced scrolling in Elasticsearch.
-//
-// Example
-//
-// Scroll with 4 parallel slices through an index called "products".
-// Use "_uid" as the default field:
-//
-// sliced_scroll -index=products -n=4
-//
-package main
-
-import (
- "context"
- "flag"
- "fmt"
- "io"
- "log"
- "sync"
- "sync/atomic"
- "time"
-
- "golang.org/x/sync/errgroup"
- "github.com/olivere/elastic"
-)
-
-func main() {
- var (
- url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
- index = flag.String("index", "", "Elasticsearch index name")
- typ = flag.String("type", "", "Elasticsearch type name")
- field = flag.String("field", "", "Slice field (must be numeric)")
- numSlices = flag.Int("n", 2, "Number of slices to use in parallel")
- sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
- )
- flag.Parse()
- log.SetFlags(0)
-
- if *url == "" {
- log.Fatal("missing url parameter")
- }
- if *index == "" {
- log.Fatal("missing index parameter")
- }
- if *numSlices <= 0 {
- log.Fatal("n must be greater than zero")
- }
-
- // Create an Elasticsearch client
- client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
- if err != nil {
- log.Fatal(err)
- }
-
- // Setup a group of goroutines from the excellent errgroup package
- g, ctx := errgroup.WithContext(context.TODO())
-
- // Hits channel will be sent to from the first set of goroutines and consumed by the second
- type hit struct {
- Slice int
- Hit elastic.SearchHit
- }
- hitsc := make(chan hit)
-
- begin := time.Now()
-
- // Start a number of goroutines to parallelize scrolling
- var wg sync.WaitGroup
- for i := 0; i < *numSlices; i++ {
- wg.Add(1)
-
- slice := i
-
- // Prepare the query
- var query elastic.Query
- if *typ == "" {
- query = elastic.NewMatchAllQuery()
- } else {
- query = elastic.NewTypeQuery(*typ)
- }
-
- // Prepare the slice
- sliceQuery := elastic.NewSliceQuery().Id(i).Max(*numSlices)
- if *field != "" {
- sliceQuery = sliceQuery.Field(*field)
- }
-
- // Start goroutine for this sliced scroll
- g.Go(func() error {
- defer wg.Done()
- svc := client.Scroll(*index).Query(query).Slice(sliceQuery)
- for {
- res, err := svc.Do(ctx)
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
- for _, searchHit := range res.Hits.Hits {
- // Pass the hit to the hits channel, which will be consumed below
- select {
- case hitsc <- hit{Slice: slice, Hit: *searchHit}:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- }
- return nil
- })
- }
- go func() {
- // Wait until all scrolling is done
- wg.Wait()
- close(hitsc)
- }()
-
- // Second goroutine will consume the hits sent from the workers in first set of goroutines
- var total uint64
- totals := make([]uint64, *numSlices)
- g.Go(func() error {
- for hit := range hitsc {
- // We simply count the hits here.
- atomic.AddUint64(&totals[hit.Slice], 1)
- current := atomic.AddUint64(&total, 1)
- sec := int(time.Since(begin).Seconds())
- fmt.Printf("%8d | %02d:%02d\r", current, sec/60, sec%60)
- select {
- default:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- return nil
- })
-
- // Wait until all goroutines are finished
- if err := g.Wait(); err != nil {
- log.Fatal(err)
- }
-
- fmt.Printf("Scrolled through a total of %d documents in %v\n", total, time.Since(begin))
- for i := 0; i < *numSlices; i++ {
- fmt.Printf("Slice %2d received %d documents\n", i, totals[i])
- }
-}