From 6d8f122a5160f6d9e4c51579f2429dfaa62c7271 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Fri, 16 Feb 2018 06:47:51 -0800 Subject: Upgrading server dependancies (#8308) --- vendor/gopkg.in/olivere/elastic.v5/.travis.yml | 2 +- vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS | 11 +- vendor/gopkg.in/olivere/elastic.v5/README.md | 16 +- .../gopkg.in/olivere/elastic.v5/bulk_processor.go | 61 ++- vendor/gopkg.in/olivere/elastic.v5/client.go | 8 +- vendor/gopkg.in/olivere/elastic.v5/errors.go | 8 + vendor/gopkg.in/olivere/elastic.v5/msearch.go | 39 +- vendor/gopkg.in/olivere/elastic.v5/msearch_test.go | 105 +++++ .../elastic.v5/recipes/bulk_processor/main.go | 149 ++++++ vendor/gopkg.in/olivere/elastic.v5/reindex.go | 10 + vendor/gopkg.in/olivere/elastic.v5/run-es.sh | 4 +- vendor/gopkg.in/olivere/elastic.v5/search_aggs.go | 70 +++ .../elastic.v5/search_aggs_bucket_composite.go | 498 +++++++++++++++++++++ .../search_aggs_bucket_composite_test.go | 92 ++++ .../elastic.v5/search_aggs_bucket_date_range.go | 9 + .../search_aggs_bucket_date_range_test.go | 4 +- .../olivere/elastic.v5/search_aggs_test.go | 441 ++++++++++++------ .../olivere/elastic.v5/search_queries_terms_set.go | 96 ++++ .../elastic.v5/search_queries_terms_set_test.go | 75 ++++ .../gopkg.in/olivere/elastic.v5/search_request.go | 49 +- vendor/gopkg.in/olivere/elastic.v5/search_test.go | 55 +++ 21 files changed, 1621 insertions(+), 181 deletions(-) create mode 100644 vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go create mode 100644 vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite.go create mode 100644 vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite_test.go create mode 100644 vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set.go create mode 100644 vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set_test.go (limited to 'vendor/gopkg.in/olivere') diff --git a/vendor/gopkg.in/olivere/elastic.v5/.travis.yml b/vendor/gopkg.in/olivere/elastic.v5/.travis.yml index b4322c13c..9658f873a 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/.travis.yml +++ b/vendor/gopkg.in/olivere/elastic.v5/.travis.yml @@ -12,4 +12,4 @@ services: - docker before_install: - sudo sysctl -w vm.max_map_count=262144 - - docker run -d --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch:6.1.2 elasticsearch -Expack.security.enabled=false -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_ + - docker run -d --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.1 elasticsearch -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_ diff --git a/vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS b/vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS index d7f7f780f..ba06dac29 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS +++ b/vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS @@ -68,9 +68,11 @@ Joe Buck [@four2five](https://github.com/four2five) John Barker [@j16r](https://github.com/j16r) John Goodall [@jgoodall](https://github.com/jgoodall) John Stanford [@jxstanford](https://github.com/jxstanford) +Jonas Groenaas Drange [@semafor](https://github.com/semafor) Josh Chorlton [@jchorl](https://github.com/jchorl) jun [@coseyo](https://github.com/coseyo) Junpei Tsuji [@jun06t](https://github.com/jun06t) +kartlee [@kartlee](https://github.com/kartlee) Keith Hatton [@khatton-ft](https://github.com/khatton-ft) kel [@liketic](https://github.com/liketic) Kenta SUZUKI [@suzuken](https://github.com/suzuken) @@ -98,10 +100,13 @@ Orne Brocaar [@brocaar](https://github.com/brocaar) Paul [@eyeamera](https://github.com/eyeamera) Pete C [@peteclark-ft](https://github.com/peteclark-ft) Radoslaw Wesolowski [r--w](https://github.com/r--w) +Roman Colohanin [@zuzmic](https://github.com/zuzmic) Ryan Schmukler [@rschmukler](https://github.com/rschmukler) +Ryan Wynn [@rwynn](https://github.com/rwynn) Sacheendra talluri [@sacheendra](https://github.com/sacheendra) Sean DuBois [@Sean-Der](https://github.com/Sean-Der) Shalin LK [@shalinlk](https://github.com/shalinlk) +singham [@zhaochenxiao90](https://github.com/zhaochenxiao90) Stephen Kubovic [@stephenkubovic](https://github.com/stephenkubovic) Stuart Warren [@Woz](https://github.com/stuart-warren) Sulaiman [@salajlan](https://github.com/salajlan) @@ -111,13 +116,13 @@ Take [ww24](https://github.com/ww24) Tetsuya Morimoto [@t2y](https://github.com/t2y) TimeEmit [@TimeEmit](https://github.com/timeemit) TusharM [@tusharm](https://github.com/tusharm) -zhangxin [@visaxin](https://github.com/visaxin) wangtuo [@wangtuo](https://github.com/wangtuo) Wédney Yuri [@wedneyyuri](https://github.com/wedneyyuri) wolfkdy [@wolfkdy](https://github.com/wolfkdy) Wyndham Blanton [@wyndhblb](https://github.com/wyndhblb) Yarden Bar [@ayashjorden](https://github.com/ayashjorden) zakthomas [@zakthomas](https://github.com/zakthomas) -singham [@zhaochenxiao90](https://github.com/zhaochenxiao90) +Yuya Kusakabe [@higebu](https://github.com/higebu) +Zach [@snowzach](https://github.com/snowzach) +zhangxin [@visaxin](https://github.com/visaxin) @林 [@zplzpl](https://github.com/zplzpl) -Roman Colohanin [@zuzmic](https://github.com/zuzmic) diff --git a/vendor/gopkg.in/olivere/elastic.v5/README.md b/vendor/gopkg.in/olivere/elastic.v5/README.md index f452b664d..d0cdd7821 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/README.md +++ b/vendor/gopkg.in/olivere/elastic.v5/README.md @@ -199,6 +199,7 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details. - [x] Significant Terms - [x] Significant Text - [x] Terms + - [x] Composite - Pipeline Aggregations - [x] Avg Bucket - [x] Derivative @@ -212,6 +213,7 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details. - [x] Cumulative Sum - [x] Bucket Script - [x] Bucket Selector + - [ ] Bucket Sort - [x] Serial Differencing - [x] Matrix Aggregations - [x] Matrix Stats @@ -234,17 +236,17 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details. - [x] Update Indices Settings - [x] Get Settings - [x] Analyze + - [x] Explain Analyze - [x] Index Templates -- [ ] Shadow Replica Indices - [x] Indices Stats - [x] Indices Segments - [ ] Indices Recovery - [ ] Indices Shard Stores - [ ] Clear Cache - [x] Flush + - [x] Synced Flush - [x] Refresh - [x] Force Merge -- [ ] Upgrade ### cat APIs @@ -267,6 +269,7 @@ The cat APIs are not implemented as of now. We think they are better suited for - [ ] cat shards - [ ] cat segments - [ ] cat snapshots +- [ ] cat templates ### Cluster APIs @@ -278,6 +281,8 @@ The cat APIs are not implemented as of now. We think they are better suited for - [ ] Cluster Update Settings - [x] Nodes Stats - [x] Nodes Info +- [ ] Nodes Feature Usage +- [ ] Remote Cluster Info - [x] Task Management API - [ ] Nodes hot_threads - [ ] Cluster Allocation Explain API @@ -297,6 +302,7 @@ The cat APIs are not implemented as of now. We think they are better suited for - Term level queries - [x] Term Query - [x] Terms Query + - [x] Terms Set Query - [x] Range Query - [x] Exists Query - [x] Prefix Query @@ -311,7 +317,6 @@ The cat APIs are not implemented as of now. We think they are better suited for - [x] Dis Max Query - [x] Function Score Query - [x] Boosting Query - - [x] Indices Query - Joining queries - [x] Nested Query - [x] Has Child Query @@ -321,12 +326,9 @@ The cat APIs are not implemented as of now. We think they are better suited for - [ ] GeoShape Query - [x] Geo Bounding Box Query - [x] Geo Distance Query - - [ ] Geo Distance Range Query - [x] Geo Polygon Query - - [ ] Geohash Cell Query - Specialized queries - [x] More Like This Query - - [x] Template Query - [x] Script Query - [x] Percolate Query - Span queries @@ -346,7 +348,7 @@ The cat APIs are not implemented as of now. We think they are better suited for - Snapshot and Restore - [x] Repositories - - [ ] Snapshot + - [x] Snapshot - [ ] Restore - [ ] Snapshot status - [ ] Monitoring snapshot/restore status diff --git a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go index b2709a880..6ee8a3dee 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go +++ b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go @@ -6,6 +6,7 @@ package elastic import ( "context" + "net" "sync" "sync/atomic" "time" @@ -121,7 +122,7 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService { return s } -// Set the backoff strategy to use for errors +// Backoff sets the backoff strategy to use for errors. func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService { s.backoff = backoff return s @@ -248,6 +249,8 @@ type BulkProcessor struct { statsMu sync.Mutex // guards the following block stats *BulkProcessorStats + + stopReconnC chan struct{} // channel to signal stop reconnection attempts } func newBulkProcessor( @@ -293,6 +296,7 @@ func (p *BulkProcessor) Start(ctx context.Context) error { p.requestsC = make(chan BulkableRequest) p.executionId = 0 p.stats = newBulkProcessorStats(p.numWorkers) + p.stopReconnC = make(chan struct{}) // Create and start up workers. p.workers = make([]*bulkWorker, p.numWorkers) @@ -331,6 +335,12 @@ func (p *BulkProcessor) Close() error { return nil } + // Tell connection checkers to stop + if p.stopReconnC != nil { + close(p.stopReconnC) + p.stopReconnC = nil + } + // Stop flusher (if enabled) if p.flusherStopC != nil { p.flusherStopC <- struct{}{} @@ -436,29 +446,43 @@ func (w *bulkWorker) work(ctx context.Context) { var stop bool for !stop { + var err error select { case req, open := <-w.p.requestsC: if open { // Received a new request w.service.Add(req) if w.commitRequired() { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } } else { // Channel closed: Stop. stop = true if w.service.NumberOfActions() > 0 { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } } case <-w.flushC: // Commit outstanding requests if w.service.NumberOfActions() > 0 { - w.commit(ctx) // TODO swallow errors here? + err = w.commit(ctx) } w.flushAckC <- struct{}{} } + if !stop && err != nil { + waitForActive := func() { + // Add back pressure to prevent Add calls from filling up the request queue + ready := make(chan struct{}) + go w.waitForActiveConnection(ready) + <-ready + } + if _, ok := err.(net.Error); ok { + waitForActive() + } else if IsConnErr(err) { + waitForActive() + } + } } } @@ -511,6 +535,35 @@ func (w *bulkWorker) commit(ctx context.Context) error { return err } +func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) { + defer close(ready) + + t := time.NewTicker(5 * time.Second) + defer t.Stop() + + client := w.p.c + stopReconnC := w.p.stopReconnC + w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name) + + // loop until a health check finds at least 1 active connection or the reconnection channel is closed + for { + select { + case _, ok := <-stopReconnC: + if !ok { + w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name) + return + } + case <-t.C: + client.healthcheck(time.Duration(3)*time.Second, true) + if client.mustActiveConn() == nil { + // found an active connection + // exit and signal done to the WaitGroup + return + } + } + } +} + func (w *bulkWorker) updateStats(res *BulkResponse) { // Update stats if res != nil { diff --git a/vendor/gopkg.in/olivere/elastic.v5/client.go b/vendor/gopkg.in/olivere/elastic.v5/client.go index 1eb0ec54f..165a30526 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/client.go +++ b/vendor/gopkg.in/olivere/elastic.v5/client.go @@ -26,7 +26,7 @@ import ( const ( // Version is the current version of Elastic. - Version = "6.1.4" + Version = "6.1.7" // DefaultURL is the default endpoint of Elasticsearch on the local machine. // It is used e.g. when initializing a new Client without a specific URL. @@ -1778,9 +1778,3 @@ func (c *Client) WaitForGreenStatus(timeout string) error { func (c *Client) WaitForYellowStatus(timeout string) error { return c.WaitForStatus("yellow", timeout) } - -// IsConnError unwraps the given error value and checks if it is equal to -// elastic.ErrNoClient. -func IsConnErr(err error) bool { - return errors.Cause(err) == ErrNoClient -} diff --git a/vendor/gopkg.in/olivere/elastic.v5/errors.go b/vendor/gopkg.in/olivere/elastic.v5/errors.go index 00a936621..e40cda845 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/errors.go +++ b/vendor/gopkg.in/olivere/elastic.v5/errors.go @@ -9,6 +9,8 @@ import ( "fmt" "io/ioutil" "net/http" + + "github.com/pkg/errors" ) // checkResponse will return an error if the request/response indicates @@ -89,6 +91,12 @@ func (e *Error) Error() string { } } +// IsConnErr returns true if the error indicates that Elastic could not +// find an Elasticsearch host to connect to. +func IsConnErr(err error) bool { + return err == ErrNoClient || errors.Cause(err) == ErrNoClient +} + // IsNotFound returns true if the given error indicates that Elasticsearch // returned HTTP status 404. The err parameter can be of type *elastic.Error, // elastic.Error, *http.Response or int (indicating the HTTP status code). diff --git a/vendor/gopkg.in/olivere/elastic.v5/msearch.go b/vendor/gopkg.in/olivere/elastic.v5/msearch.go index ed54d3c2f..c1a589a97 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/msearch.go +++ b/vendor/gopkg.in/olivere/elastic.v5/msearch.go @@ -14,19 +14,17 @@ import ( // MultiSearch executes one or more searches in one roundtrip. type MultiSearchService struct { - client *Client - requests []*SearchRequest - indices []string - pretty bool - routing string - preference string + client *Client + requests []*SearchRequest + indices []string + pretty bool + maxConcurrentRequests *int + preFilterShardSize *int } func NewMultiSearchService(client *Client) *MultiSearchService { builder := &MultiSearchService{ - client: client, - requests: make([]*SearchRequest, 0), - indices: make([]string, 0), + client: client, } return builder } @@ -46,6 +44,16 @@ func (s *MultiSearchService) Pretty(pretty bool) *MultiSearchService { return s } +func (s *MultiSearchService) MaxConcurrentSearches(max int) *MultiSearchService { + s.maxConcurrentRequests = &max + return s +} + +func (s *MultiSearchService) PreFilterShardSize(size int) *MultiSearchService { + s.preFilterShardSize = &size + return s +} + func (s *MultiSearchService) Do(ctx context.Context) (*MultiSearchResult, error) { // Build url path := "/_msearch" @@ -55,6 +63,12 @@ func (s *MultiSearchService) Do(ctx context.Context) (*MultiSearchResult, error) if s.pretty { params.Set("pretty", fmt.Sprintf("%v", s.pretty)) } + if v := s.maxConcurrentRequests; v != nil { + params.Set("max_concurrent_searches", fmt.Sprintf("%v", *v)) + } + if v := s.preFilterShardSize; v != nil { + params.Set("pre_filter_shard_size", fmt.Sprintf("%v", *v)) + } // Set body var lines []string @@ -68,14 +82,14 @@ func (s *MultiSearchService) Do(ctx context.Context) (*MultiSearchResult, error) if err != nil { return nil, err } - body, err := json.Marshal(sr.Body()) + body, err := sr.Body() if err != nil { return nil, err } lines = append(lines, string(header)) - lines = append(lines, string(body)) + lines = append(lines, body) } - body := strings.Join(lines, "\n") + "\n" // Don't forget trailing \n + body := strings.Join(lines, "\n") + "\n" // add trailing \n // Get response res, err := s.client.PerformRequest(ctx, PerformRequestOptions{ @@ -96,6 +110,7 @@ func (s *MultiSearchService) Do(ctx context.Context) (*MultiSearchResult, error) return ret, nil } +// MultiSearchResult is the outcome of running a multi-search operation. type MultiSearchResult struct { Responses []*SearchResult `json:"responses,omitempty"` } diff --git a/vendor/gopkg.in/olivere/elastic.v5/msearch_test.go b/vendor/gopkg.in/olivere/elastic.v5/msearch_test.go index 79f2047e6..d25e2cc28 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/msearch_test.go +++ b/vendor/gopkg.in/olivere/elastic.v5/msearch_test.go @@ -13,6 +13,7 @@ import ( func TestMultiSearch(t *testing.T) { client := setupTestClientAndCreateIndex(t) + // client := setupTestClientAndCreateIndexAndLog(t) tweet1 := tweet{ User: "olivere", @@ -60,6 +61,110 @@ func TestMultiSearch(t *testing.T) { sreq2 := NewSearchRequest().Index(testIndexName).Type("doc"). Source(NewSearchSource().Query(q2)) + searchResult, err := client.MultiSearch(). + Add(sreq1, sreq2). + Pretty(true). + Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + if searchResult.Responses == nil { + t.Fatal("expected responses != nil; got nil") + } + if len(searchResult.Responses) != 2 { + t.Fatalf("expected 2 responses; got %d", len(searchResult.Responses)) + } + + sres := searchResult.Responses[0] + if sres.Hits == nil { + t.Errorf("expected Hits != nil; got nil") + } + if sres.Hits.TotalHits != 3 { + t.Errorf("expected Hits.TotalHits = %d; got %d", 3, sres.Hits.TotalHits) + } + if len(sres.Hits.Hits) != 3 { + t.Errorf("expected len(Hits.Hits) = %d; got %d", 3, len(sres.Hits.Hits)) + } + for _, hit := range sres.Hits.Hits { + if hit.Index != testIndexName { + t.Errorf("expected Hits.Hit.Index = %q; got %q", testIndexName, hit.Index) + } + item := make(map[string]interface{}) + err := json.Unmarshal(*hit.Source, &item) + if err != nil { + t.Fatal(err) + } + } + + sres = searchResult.Responses[1] + if sres.Hits == nil { + t.Errorf("expected Hits != nil; got nil") + } + if sres.Hits.TotalHits != 2 { + t.Errorf("expected Hits.TotalHits = %d; got %d", 2, sres.Hits.TotalHits) + } + if len(sres.Hits.Hits) != 2 { + t.Errorf("expected len(Hits.Hits) = %d; got %d", 2, len(sres.Hits.Hits)) + } + for _, hit := range sres.Hits.Hits { + if hit.Index != testIndexName { + t.Errorf("expected Hits.Hit.Index = %q; got %q", testIndexName, hit.Index) + } + item := make(map[string]interface{}) + err := json.Unmarshal(*hit.Source, &item) + if err != nil { + t.Fatal(err) + } + } +} + +func TestMultiSearchWithStrings(t *testing.T) { + client := setupTestClientAndCreateIndex(t) + // client := setupTestClientAndCreateIndexAndLog(t) + + tweet1 := tweet{ + User: "olivere", + Message: "Welcome to Golang and Elasticsearch.", + Tags: []string{"golang", "elasticsearch"}, + } + tweet2 := tweet{ + User: "olivere", + Message: "Another unrelated topic.", + Tags: []string{"golang"}, + } + tweet3 := tweet{ + User: "sandrae", + Message: "Cycling is fun.", + Tags: []string{"sports", "cycling"}, + } + + // Add all documents + _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + + _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + + _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + + _, err = client.Flush().Index(testIndexName).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + + // Spawn two search queries with one roundtrip + sreq1 := NewSearchRequest().Index(testIndexName, testIndexName2). + Source(`{"query":{"match_all":{}}}`) + sreq2 := NewSearchRequest().Index(testIndexName).Type("doc"). + Source(`{"query":{"term":{"tags":"golang"}}}`) + searchResult, err := client.MultiSearch(). Add(sreq1, sreq2). Do(context.TODO()) diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go new file mode 100644 index 000000000..f13243297 --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go @@ -0,0 +1,149 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// BulkProcessor runs a bulk processing job that fills an index +// given certain criteria like flush interval etc. +// +// Example +// +// bulk_processor -url=http://127.0.0.1:9200/bulk-processor-test?sniff=false -n=100000 -flush-interval=1s +// +package main + +import ( + "context" + "flag" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "sync/atomic" + "syscall" + "time" + + "github.com/google/uuid" + + "github.com/olivere/elastic" + "github.com/olivere/elastic/config" +) + +func main() { + var ( + url = flag.String("url", "http://localhost:9200/bulk-processor-test", "Elasticsearch URL") + numWorkers = flag.Int("num-workers", 4, "Number of workers") + n = flag.Int64("n", -1, "Number of documents to process (-1 for unlimited)") + flushInterval = flag.Duration("flush-interval", 1*time.Second, "Flush interval") + bulkActions = flag.Int("bulk-actions", 0, "Number of bulk actions before committing") + bulkSize = flag.Int("bulk-size", 0, "Size of bulk requests before committing") + ) + flag.Parse() + log.SetFlags(0) + + rand.Seed(time.Now().UnixNano()) + + // Parse configuration from URL + cfg, err := config.Parse(*url) + if err != nil { + log.Fatal(err) + } + + // Create an Elasticsearch client from the parsed config + client, err := elastic.NewClientFromConfig(cfg) + if err != nil { + log.Fatal(err) + } + + // Drop old index + exists, err := client.IndexExists(cfg.Index).Do(context.Background()) + if err != nil { + log.Fatal(err) + } + if exists { + _, err = client.DeleteIndex(cfg.Index).Do(context.Background()) + if err != nil { + log.Fatal(err) + } + } + + // Create processor + bulkp := elastic.NewBulkProcessorService(client). + Name("bulk-test-processor"). + Stats(true). + Backoff(elastic.StopBackoff{}). + FlushInterval(*flushInterval). + Workers(*numWorkers) + if *bulkActions > 0 { + bulkp = bulkp.BulkActions(*bulkActions) + } + if *bulkSize > 0 { + bulkp = bulkp.BulkSize(*bulkSize) + } + p, err := bulkp.Do(context.Background()) + if err != nil { + log.Fatal(err) + } + + var created int64 + errc := make(chan error, 1) + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + <-c + errc <- nil + }() + + go func() { + defer func() { + if err := p.Close(); err != nil { + errc <- err + } + }() + + type Doc struct { + Timestamp time.Time `json:"@timestamp"` + } + + for { + current := atomic.AddInt64(&created, 1) + if *n > 0 && current >= *n { + errc <- nil + return + } + r := elastic.NewBulkIndexRequest(). + Index(cfg.Index). + Type("doc"). + Id(uuid.New().String()). + Doc(Doc{Timestamp: time.Now()}) + p.Add(r) + + time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond) + } + }() + + go func() { + t := time.NewTicker(1 * time.Second) + defer t.Stop() + for range t.C { + stats := p.Stats() + written := atomic.LoadInt64(&created) + var queued int64 + for _, w := range stats.Workers { + queued += w.Queued + } + fmt.Printf("Queued=%5d Written=%8d Succeeded=%8d Failed=%8d Comitted=%6d Flushed=%6d\n", + queued, + written, + stats.Succeeded, + stats.Failed, + stats.Committed, + stats.Flushed, + ) + } + }() + + if err := <-errc; err != nil { + log.Fatal(err) + } +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/reindex.go b/vendor/gopkg.in/olivere/elastic.v5/reindex.go index 35440fa80..9cdd50a68 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/reindex.go +++ b/vendor/gopkg.in/olivere/elastic.v5/reindex.go @@ -20,6 +20,7 @@ type ReindexService struct { waitForActiveShards string waitForCompletion *bool requestsPerSecond *int + slices *int body interface{} source *ReindexSource destination *ReindexDestination @@ -51,6 +52,12 @@ func (s *ReindexService) RequestsPerSecond(requestsPerSecond int) *ReindexServic return s } +// Slices specifies the number of slices this task should be divided into. Defaults to 1. +func (s *ReindexService) Slices(slices int) *ReindexService { + s.slices = &slices + return s +} + // Refresh indicates whether Elasticsearch should refresh the effected indexes // immediately. func (s *ReindexService) Refresh(refresh string) *ReindexService { @@ -179,6 +186,9 @@ func (s *ReindexService) buildURL() (string, url.Values, error) { if s.requestsPerSecond != nil { params.Set("requests_per_second", fmt.Sprintf("%v", *s.requestsPerSecond)) } + if s.slices != nil { + params.Set("slices", fmt.Sprintf("%v", *s.slices)) + } if s.waitForActiveShards != "" { params.Set("wait_for_active_shards", s.waitForActiveShards) } diff --git a/vendor/gopkg.in/olivere/elastic.v5/run-es.sh b/vendor/gopkg.in/olivere/elastic.v5/run-es.sh index 1f4a851d4..624a864ed 100755 --- a/vendor/gopkg.in/olivere/elastic.v5/run-es.sh +++ b/vendor/gopkg.in/olivere/elastic.v5/run-es.sh @@ -1,3 +1,3 @@ #!/bin/sh -VERSION=${VERSION:=6.1.2} -docker run --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch:$VERSION elasticsearch -Expack.security.enabled=false -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_ +VERSION=${VERSION:=6.2.1} +docker run --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch-oss:$VERSION elasticsearch -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_ diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs.go index c5082b2b1..6359611b1 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/search_aggs.go +++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs.go @@ -653,6 +653,23 @@ func (a Aggregations) SerialDiff(name string) (*AggregationPipelineSimpleValue, return nil, false } +// Composite returns composite bucket aggregation results. +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html +// for details. +func (a Aggregations) Composite(name string) (*AggregationBucketCompositeItems, bool) { + if raw, found := a[name]; found { + agg := new(AggregationBucketCompositeItems) + if raw == nil { + return agg, true + } + if err := json.Unmarshal(*raw, agg); err == nil { + return agg, true + } + } + return nil, false +} + // -- Single value metric -- // AggregationValueMetric is a single-value metric, returned e.g. by a @@ -1448,3 +1465,56 @@ func (a *AggregationPipelinePercentilesMetric) UnmarshalJSON(data []byte) error a.Aggregations = aggs return nil } + +// -- Composite key items -- + +// AggregationBucketCompositeItems implements the response structure +// for a bucket aggregation of type composite. +type AggregationBucketCompositeItems struct { + Aggregations + + Buckets []*AggregationBucketCompositeItem //`json:"buckets"` + Meta map[string]interface{} // `json:"meta,omitempty"` +} + +// UnmarshalJSON decodes JSON data and initializes an AggregationBucketCompositeItems structure. +func (a *AggregationBucketCompositeItems) UnmarshalJSON(data []byte) error { + var aggs map[string]*json.RawMessage + if err := json.Unmarshal(data, &aggs); err != nil { + return err + } + if v, ok := aggs["buckets"]; ok && v != nil { + json.Unmarshal(*v, &a.Buckets) + } + if v, ok := aggs["meta"]; ok && v != nil { + json.Unmarshal(*v, &a.Meta) + } + a.Aggregations = aggs + return nil +} + +// AggregationBucketCompositeItem is a single bucket of an AggregationBucketCompositeItems structure. +type AggregationBucketCompositeItem struct { + Aggregations + + Key map[string]interface{} //`json:"key"` + DocCount int64 //`json:"doc_count"` +} + +// UnmarshalJSON decodes JSON data and initializes an AggregationBucketCompositeItem structure. +func (a *AggregationBucketCompositeItem) UnmarshalJSON(data []byte) error { + var aggs map[string]*json.RawMessage + dec := json.NewDecoder(bytes.NewReader(data)) + dec.UseNumber() + if err := dec.Decode(&aggs); err != nil { + return err + } + if v, ok := aggs["key"]; ok && v != nil { + json.Unmarshal(*v, &a.Key) + } + if v, ok := aggs["doc_count"]; ok && v != nil { + json.Unmarshal(*v, &a.DocCount) + } + a.Aggregations = aggs + return nil +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite.go new file mode 100644 index 000000000..1d9132d2d --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite.go @@ -0,0 +1,498 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +// CompositeAggregation is a multi-bucket values source based aggregation +// that can be used to calculate unique composite values from source documents. +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html +// for details. +type CompositeAggregation struct { + after map[string]interface{} + size *int + sources []CompositeAggregationValuesSource + subAggregations map[string]Aggregation + meta map[string]interface{} +} + +// NewCompositeAggregation creates a new CompositeAggregation. +func NewCompositeAggregation() *CompositeAggregation { + return &CompositeAggregation{ + sources: make([]CompositeAggregationValuesSource, 0), + subAggregations: make(map[string]Aggregation), + } +} + +// Size represents the number of composite buckets to return. +// Defaults to 10 as of Elasticsearch 6.1. +func (a *CompositeAggregation) Size(size int) *CompositeAggregation { + a.size = &size + return a +} + +// AggregateAfter sets the values that indicate which composite bucket this +// request should "aggregate after". +func (a *CompositeAggregation) AggregateAfter(after map[string]interface{}) *CompositeAggregation { + a.after = after + return a +} + +// Sources specifies the list of CompositeAggregationValuesSource instances to +// use in the aggregation. +func (a *CompositeAggregation) Sources(sources ...CompositeAggregationValuesSource) *CompositeAggregation { + a.sources = append(a.sources, sources...) + return a +} + +// SubAggregations of this aggregation. +func (a *CompositeAggregation) SubAggregation(name string, subAggregation Aggregation) *CompositeAggregation { + a.subAggregations[name] = subAggregation + return a +} + +// Meta sets the meta data to be included in the aggregation response. +func (a *CompositeAggregation) Meta(metaData map[string]interface{}) *CompositeAggregation { + a.meta = metaData + return a +} + +// Source returns the serializable JSON for this aggregation. +func (a *CompositeAggregation) Source() (interface{}, error) { + // Example: + // { + // "aggs" : { + // "my_composite_agg" : { + // "composite" : { + // "sources": [ + // {"my_term": { "terms": { "field": "product" }}}, + // {"my_histo": { "histogram": { "field": "price", "interval": 5 }}}, + // {"my_date": { "date_histogram": { "field": "timestamp", "interval": "1d" }}}, + // ], + // "size" : 10, + // "after" : ["a", 2, "c"] + // } + // } + // } + // } + // + // This method returns only the { "histogram" : { ... } } part. + + source := make(map[string]interface{}) + opts := make(map[string]interface{}) + source["composite"] = opts + + sources := make([]interface{}, len(a.sources)) + for i, s := range a.sources { + src, err := s.Source() + if err != nil { + return nil, err + } + sources[i] = src + } + opts["sources"] = sources + + if a.size != nil { + opts["size"] = *a.size + } + + if a.after != nil { + opts["after"] = a.after + } + + // AggregationBuilder (SubAggregations) + if len(a.subAggregations) > 0 { + aggsMap := make(map[string]interface{}) + source["aggregations"] = aggsMap + for name, aggregate := range a.subAggregations { + src, err := aggregate.Source() + if err != nil { + return nil, err + } + aggsMap[name] = src + } + } + + // Add Meta data if available + if len(a.meta) > 0 { + source["meta"] = a.meta + } + + return source, nil +} + +// -- Generic interface for CompositeAggregationValues -- + +// CompositeAggregationValuesSource specifies the interface that +// all implementations for CompositeAggregation's Sources method +// need to implement. +// +// The different implementations are described in +// https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html#_values_source_2. +type CompositeAggregationValuesSource interface { + Source() (interface{}, error) +} + +// -- CompositeAggregationTermsValuesSource -- + +// CompositeAggregationTermsValuesSource is a source for the CompositeAggregation that handles terms +// it works very similar to a terms aggregation with slightly different syntax +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html#_terms +// for details. +type CompositeAggregationTermsValuesSource struct { + name string + field string + script *Script + valueType string + missing interface{} + order string +} + +// NewCompositeAggregationTermsValuesSource creates and initializes +// a new CompositeAggregationTermsValuesSource. +func NewCompositeAggregationTermsValuesSource(name string) *CompositeAggregationTermsValuesSource { + return &CompositeAggregationTermsValuesSource{ + name: name, + } +} + +// Field to use for this source. +func (a *CompositeAggregationTermsValuesSource) Field(field string) *CompositeAggregationTermsValuesSource { + a.field = field + return a +} + +// Script to use for this source. +func (a *CompositeAggregationTermsValuesSource) Script(script *Script) *CompositeAggregationTermsValuesSource { + a.script = script + return a +} + +// ValueType specifies the type of values produced by this source, +// e.g. "string" or "date". +func (a *CompositeAggregationTermsValuesSource) ValueType(valueType string) *CompositeAggregationTermsValuesSource { + a.valueType = valueType + return a +} + +// Order specifies the order in the values produced by this source. +// It can be either "asc" or "desc". +func (a *CompositeAggregationTermsValuesSource) Order(order string) *CompositeAggregationTermsValuesSource { + a.order = order + return a +} + +// Asc ensures the order of the values produced is ascending. +func (a *CompositeAggregationTermsValuesSource) Asc() *CompositeAggregationTermsValuesSource { + a.order = "asc" + return a +} + +// Desc ensures the order of the values produced is descending. +func (a *CompositeAggregationTermsValuesSource) Desc() *CompositeAggregationTermsValuesSource { + a.order = "desc" + return a +} + +// Missing specifies the value to use when the source finds a missing +// value in a document. +func (a *CompositeAggregationTermsValuesSource) Missing(missing interface{}) *CompositeAggregationTermsValuesSource { + a.missing = missing + return a +} + +// Source returns the serializable JSON for this values source. +func (a *CompositeAggregationTermsValuesSource) Source() (interface{}, error) { + source := make(map[string]interface{}) + name := make(map[string]interface{}) + source[a.name] = name + values := make(map[string]interface{}) + name["terms"] = values + + // field + if a.field != "" { + values["field"] = a.field + } + + // script + if a.script != nil { + src, err := a.script.Source() + if err != nil { + return nil, err + } + values["script"] = src + } + + // missing + if a.missing != nil { + values["missing"] = a.missing + } + + // value_type + if a.valueType != "" { + values["value_type"] = a.valueType + } + + // order + if a.order != "" { + values["order"] = a.order + } + + return source, nil + +} + +// -- CompositeAggregationHistogramValuesSource -- + +// CompositeAggregationHistogramValuesSource is a source for the CompositeAggregation that handles histograms +// it works very similar to a terms histogram with slightly different syntax +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html#_histogram +// for details. +type CompositeAggregationHistogramValuesSource struct { + name string + field string + script *Script + valueType string + missing interface{} + order string + interval float64 +} + +// NewCompositeAggregationHistogramValuesSource creates and initializes +// a new CompositeAggregationHistogramValuesSource. +func NewCompositeAggregationHistogramValuesSource(name string, interval float64) *CompositeAggregationHistogramValuesSource { + return &CompositeAggregationHistogramValuesSource{ + name: name, + interval: interval, + } +} + +// Field to use for this source. +func (a *CompositeAggregationHistogramValuesSource) Field(field string) *CompositeAggregationHistogramValuesSource { + a.field = field + return a +} + +// Script to use for this source. +func (a *CompositeAggregationHistogramValuesSource) Script(script *Script) *CompositeAggregationHistogramValuesSource { + a.script = script + return a +} + +// ValueType specifies the type of values produced by this source, +// e.g. "string" or "date". +func (a *CompositeAggregationHistogramValuesSource) ValueType(valueType string) *CompositeAggregationHistogramValuesSource { + a.valueType = valueType + return a +} + +// Missing specifies the value to use when the source finds a missing +// value in a document. +func (a *CompositeAggregationHistogramValuesSource) Missing(missing interface{}) *CompositeAggregationHistogramValuesSource { + a.missing = missing + return a +} + +// Order specifies the order in the values produced by this source. +// It can be either "asc" or "desc". +func (a *CompositeAggregationHistogramValuesSource) Order(order string) *CompositeAggregationHistogramValuesSource { + a.order = order + return a +} + +// Asc ensures the order of the values produced is ascending. +func (a *CompositeAggregationHistogramValuesSource) Asc() *CompositeAggregationHistogramValuesSource { + a.order = "asc" + return a +} + +// Desc ensures the order of the values produced is descending. +func (a *CompositeAggregationHistogramValuesSource) Desc() *CompositeAggregationHistogramValuesSource { + a.order = "desc" + return a +} + +// Interval specifies the interval to use. +func (a *CompositeAggregationHistogramValuesSource) Interval(interval float64) *CompositeAggregationHistogramValuesSource { + a.interval = interval + return a +} + +// Source returns the serializable JSON for this values source. +func (a *CompositeAggregationHistogramValuesSource) Source() (interface{}, error) { + source := make(map[string]interface{}) + name := make(map[string]interface{}) + source[a.name] = name + values := make(map[string]interface{}) + name["histogram"] = values + + // field + if a.field != "" { + values["field"] = a.field + } + + // script + if a.script != nil { + src, err := a.script.Source() + if err != nil { + return nil, err + } + values["script"] = src + } + + // missing + if a.missing != nil { + values["missing"] = a.missing + } + + // value_type + if a.valueType != "" { + values["value_type"] = a.valueType + } + + // order + if a.order != "" { + values["order"] = a.order + } + + // Histogram-related properties + values["interval"] = a.interval + + return source, nil + +} + +// -- CompositeAggregationDateHistogramValuesSource -- + +// CompositeAggregationDateHistogramValuesSource is a source for the CompositeAggregation that handles date histograms +// it works very similar to a date histogram aggregation with slightly different syntax +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html#_date_histogram +// for details. +type CompositeAggregationDateHistogramValuesSource struct { + name string + field string + script *Script + valueType string + missing interface{} + order string + interval interface{} + timeZone string +} + +// NewCompositeAggregationDateHistogramValuesSource creates and initializes +// a new CompositeAggregationDateHistogramValuesSource. +func NewCompositeAggregationDateHistogramValuesSource(name string, interval interface{}) *CompositeAggregationDateHistogramValuesSource { + return &CompositeAggregationDateHistogramValuesSource{ + name: name, + interval: interval, + } +} + +// Field to use for this source. +func (a *CompositeAggregationDateHistogramValuesSource) Field(field string) *CompositeAggregationDateHistogramValuesSource { + a.field = field + return a +} + +// Script to use for this source. +func (a *CompositeAggregationDateHistogramValuesSource) Script(script *Script) *CompositeAggregationDateHistogramValuesSource { + a.script = script + return a +} + +// ValueType specifies the type of values produced by this source, +// e.g. "string" or "date". +func (a *CompositeAggregationDateHistogramValuesSource) ValueType(valueType string) *CompositeAggregationDateHistogramValuesSource { + a.valueType = valueType + return a +} + +// Missing specifies the value to use when the source finds a missing +// value in a document. +func (a *CompositeAggregationDateHistogramValuesSource) Missing(missing interface{}) *CompositeAggregationDateHistogramValuesSource { + a.missing = missing + return a +} + +// Order specifies the order in the values produced by this source. +// It can be either "asc" or "desc". +func (a *CompositeAggregationDateHistogramValuesSource) Order(order string) *CompositeAggregationDateHistogramValuesSource { + a.order = order + return a +} + +// Asc ensures the order of the values produced is ascending. +func (a *CompositeAggregationDateHistogramValuesSource) Asc() *CompositeAggregationDateHistogramValuesSource { + a.order = "asc" + return a +} + +// Desc ensures the order of the values produced is descending. +func (a *CompositeAggregationDateHistogramValuesSource) Desc() *CompositeAggregationDateHistogramValuesSource { + a.order = "desc" + return a +} + +// Interval to use for the date histogram, e.g. "1d" or a numeric value like "60". +func (a *CompositeAggregationDateHistogramValuesSource) Interval(interval interface{}) *CompositeAggregationDateHistogramValuesSource { + a.interval = interval + return a +} + +// TimeZone to use for the dates. +func (a *CompositeAggregationDateHistogramValuesSource) TimeZone(timeZone string) *CompositeAggregationDateHistogramValuesSource { + a.timeZone = timeZone + return a +} + +// Source returns the serializable JSON for this values source. +func (a *CompositeAggregationDateHistogramValuesSource) Source() (interface{}, error) { + source := make(map[string]interface{}) + name := make(map[string]interface{}) + source[a.name] = name + values := make(map[string]interface{}) + name["date_histogram"] = values + + // field + if a.field != "" { + values["field"] = a.field + } + + // script + if a.script != nil { + src, err := a.script.Source() + if err != nil { + return nil, err + } + values["script"] = src + } + + // missing + if a.missing != nil { + values["missing"] = a.missing + } + + // value_type + if a.valueType != "" { + values["value_type"] = a.valueType + } + + // order + if a.order != "" { + values["order"] = a.order + } + + // DateHistogram-related properties + values["interval"] = a.interval + + // timeZone + if a.timeZone != "" { + values["time_zone"] = a.timeZone + } + + return source, nil +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite_test.go new file mode 100644 index 000000000..91d84dbdb --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite_test.go @@ -0,0 +1,92 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "encoding/json" + "testing" +) + +func TestCompositeAggregation(t *testing.T) { + agg := NewCompositeAggregation(). + Sources( + NewCompositeAggregationTermsValuesSource("my_terms").Field("a_term").Missing("N/A").Order("asc"), + NewCompositeAggregationHistogramValuesSource("my_histogram", 5).Field("price").Asc(), + NewCompositeAggregationDateHistogramValuesSource("my_date_histogram", "1d").Field("purchase_date").Desc(), + ). + Size(10). + AggregateAfter(map[string]interface{}{ + "my_terms": "1", + "my_histogram": 2, + "my_date_histogram": "3", + }) + src, err := agg.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"composite":{"after":{"my_date_histogram":"3","my_histogram":2,"my_terms":"1"},"size":10,"sources":[{"my_terms":{"terms":{"field":"a_term","missing":"N/A","order":"asc"}}},{"my_histogram":{"histogram":{"field":"price","interval":5,"order":"asc"}}},{"my_date_histogram":{"date_histogram":{"field":"purchase_date","interval":"1d","order":"desc"}}}]}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} + +func TestCompositeAggregationTermsValuesSource(t *testing.T) { + in := NewCompositeAggregationTermsValuesSource("products"). + Script(NewScript("doc['product'].value").Lang("painless")) + src, err := in.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"products":{"terms":{"script":{"lang":"painless","source":"doc['product'].value"}}}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} + +func TestCompositeAggregationHistogramValuesSource(t *testing.T) { + in := NewCompositeAggregationHistogramValuesSource("histo", 5). + Field("price") + src, err := in.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"histo":{"histogram":{"field":"price","interval":5}}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} + +func TestCompositeAggregationDateHistogramValuesSource(t *testing.T) { + in := NewCompositeAggregationDateHistogramValuesSource("date", "1d"). + Field("timestamp") + src, err := in.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"date":{"date_histogram":{"field":"timestamp","interval":"1d"}}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go index 5407dadb8..714fd3e11 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go +++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go @@ -23,6 +23,7 @@ type DateRangeAggregation struct { meta map[string]interface{} keyed *bool unmapped *bool + timeZone string format string entries []DateRangeAggregationEntry } @@ -71,6 +72,11 @@ func (a *DateRangeAggregation) Unmapped(unmapped bool) *DateRangeAggregation { return a } +func (a *DateRangeAggregation) TimeZone(timeZone string) *DateRangeAggregation { + a.timeZone = timeZone + return a +} + func (a *DateRangeAggregation) Format(format string) *DateRangeAggregation { a.format = format return a @@ -178,6 +184,9 @@ func (a *DateRangeAggregation) Source() (interface{}, error) { if a.unmapped != nil { opts["unmapped"] = *a.unmapped } + if a.timeZone != "" { + opts["time_zone"] = a.timeZone + } if a.format != "" { opts["format"] = a.format } diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go index d1c909f3e..89ed495f3 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go +++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go @@ -10,7 +10,7 @@ import ( ) func TestDateRangeAggregation(t *testing.T) { - agg := NewDateRangeAggregation().Field("created_at") + agg := NewDateRangeAggregation().Field("created_at").TimeZone("UTC") agg = agg.AddRange(nil, "2012-12-31") agg = agg.AddRange("2013-01-01", "2013-12-31") agg = agg.AddRange("2014-01-01", nil) @@ -23,7 +23,7 @@ func TestDateRangeAggregation(t *testing.T) { t.Fatalf("marshaling to JSON failed: %v", err) } got := string(data) - expected := `{"date_range":{"field":"created_at","ranges":[{"to":"2012-12-31"},{"from":"2013-01-01","to":"2013-12-31"},{"from":"2014-01-01"}]}}` + expected := `{"date_range":{"field":"created_at","ranges":[{"to":"2012-12-31"},{"from":"2013-01-01","to":"2013-12-31"},{"from":"2014-01-01"}],"time_zone":"UTC"}}` if got != expected { t.Errorf("expected\n%s\n,got:\n%s", expected, got) } diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go index 9d6fa8d27..f1b6347b3 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go +++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go @@ -13,13 +13,15 @@ import ( ) func TestAggs(t *testing.T) { - // client := setupTestClientAndCreateIndex(t, SetTraceLog(log.New(os.Stdout, "", log.LstdFlags))) + //client := setupTestClientAndCreateIndex(t, SetTraceLog(log.New(os.Stdout, "", log.LstdFlags))) client := setupTestClientAndCreateIndex(t) - esversion, err := client.ElasticsearchVersion(DefaultURL) - if err != nil { - t.Fatal(err) - } + /* + esversion, err := client.ElasticsearchVersion(DefaultURL) + if err != nil { + t.Fatal(err) + } + */ tweet1 := tweet{ User: "olivere", @@ -48,7 +50,7 @@ func TestAggs(t *testing.T) { } // Add all documents - _, err = client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) + _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) if err != nil { t.Fatal(err) } @@ -102,6 +104,11 @@ func TestAggs(t *testing.T) { topTagsAgg := NewTermsAggregation().Field("tags").Size(3).SubAggregation("top_tag_hits", topTagsHitsAgg) geoBoundsAgg := NewGeoBoundsAggregation().Field("location") geoHashAgg := NewGeoHashGridAggregation().Field("location").Precision(5) + composite := NewCompositeAggregation().Sources( + NewCompositeAggregationTermsValuesSource("composite_users").Field("user"), + NewCompositeAggregationHistogramValuesSource("composite_retweets", 1).Field("retweets"), + NewCompositeAggregationDateHistogramValuesSource("composite_created", "1m").Field("created"), + ) // Run query builder := client.Search().Index(testIndexName).Query(all).Pretty(true) @@ -109,9 +116,7 @@ func TestAggs(t *testing.T) { builder = builder.Aggregation("users", usersAgg) builder = builder.Aggregation("retweets", retweetsAgg) builder = builder.Aggregation("avgRetweets", avgRetweetsAgg) - if esversion >= "2.0" { - builder = builder.Aggregation("avgRetweetsWithMeta", avgRetweetsWithMetaAgg) - } + builder = builder.Aggregation("avgRetweetsWithMeta", avgRetweetsWithMetaAgg) builder = builder.Aggregation("minRetweets", minRetweetsAgg) builder = builder.Aggregation("maxRetweets", maxRetweetsAgg) builder = builder.Aggregation("sumRetweets", sumRetweetsAgg) @@ -134,44 +139,41 @@ func TestAggs(t *testing.T) { builder = builder.Aggregation("top-tags", topTagsAgg) builder = builder.Aggregation("viewport", geoBoundsAgg) builder = builder.Aggregation("geohashed", geoHashAgg) - if esversion >= "1.4" { - // Unnamed filters - countByUserAgg := NewFiltersAggregation(). - Filters(NewTermQuery("user", "olivere"), NewTermQuery("user", "sandrae")) - builder = builder.Aggregation("countByUser", countByUserAgg) - // Named filters - countByUserAgg2 := NewFiltersAggregation(). - FilterWithName("olivere", NewTermQuery("user", "olivere")). - FilterWithName("sandrae", NewTermQuery("user", "sandrae")) - builder = builder.Aggregation("countByUser2", countByUserAgg2) - } - if esversion >= "2.0" { - // AvgBucket - dateHisto := NewDateHistogramAggregation().Field("created").Interval("year") - dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) - builder = builder.Aggregation("avgBucketDateHisto", dateHisto) - builder = builder.Aggregation("avgSumOfRetweets", NewAvgBucketAggregation().BucketsPath("avgBucketDateHisto>sumOfRetweets")) - // MinBucket - dateHisto = NewDateHistogramAggregation().Field("created").Interval("year") - dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) - builder = builder.Aggregation("minBucketDateHisto", dateHisto) - builder = builder.Aggregation("minBucketSumOfRetweets", NewMinBucketAggregation().BucketsPath("minBucketDateHisto>sumOfRetweets")) - // MaxBucket - dateHisto = NewDateHistogramAggregation().Field("created").Interval("year") - dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) - builder = builder.Aggregation("maxBucketDateHisto", dateHisto) - builder = builder.Aggregation("maxBucketSumOfRetweets", NewMaxBucketAggregation().BucketsPath("maxBucketDateHisto>sumOfRetweets")) - // SumBucket - dateHisto = NewDateHistogramAggregation().Field("created").Interval("year") - dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) - builder = builder.Aggregation("sumBucketDateHisto", dateHisto) - builder = builder.Aggregation("sumBucketSumOfRetweets", NewSumBucketAggregation().BucketsPath("sumBucketDateHisto>sumOfRetweets")) - // MovAvg - dateHisto = NewDateHistogramAggregation().Field("created").Interval("year") - dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) - dateHisto = dateHisto.SubAggregation("movingAvg", NewMovAvgAggregation().BucketsPath("sumOfRetweets")) - builder = builder.Aggregation("movingAvgDateHisto", dateHisto) - } + // Unnamed filters + countByUserAgg := NewFiltersAggregation(). + Filters(NewTermQuery("user", "olivere"), NewTermQuery("user", "sandrae")) + builder = builder.Aggregation("countByUser", countByUserAgg) + // Named filters + countByUserAgg2 := NewFiltersAggregation(). + FilterWithName("olivere", NewTermQuery("user", "olivere")). + FilterWithName("sandrae", NewTermQuery("user", "sandrae")) + builder = builder.Aggregation("countByUser2", countByUserAgg2) + // AvgBucket + dateHisto := NewDateHistogramAggregation().Field("created").Interval("year") + dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) + builder = builder.Aggregation("avgBucketDateHisto", dateHisto) + builder = builder.Aggregation("avgSumOfRetweets", NewAvgBucketAggregation().BucketsPath("avgBucketDateHisto>sumOfRetweets")) + // MinBucket + dateHisto = NewDateHistogramAggregation().Field("created").Interval("year") + dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) + builder = builder.Aggregation("minBucketDateHisto", dateHisto) + builder = builder.Aggregation("minBucketSumOfRetweets", NewMinBucketAggregation().BucketsPath("minBucketDateHisto>sumOfRetweets")) + // MaxBucket + dateHisto = NewDateHistogramAggregation().Field("created").Interval("year") + dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) + builder = builder.Aggregation("maxBucketDateHisto", dateHisto) + builder = builder.Aggregation("maxBucketSumOfRetweets", NewMaxBucketAggregation().BucketsPath("maxBucketDateHisto>sumOfRetweets")) + // SumBucket + dateHisto = NewDateHistogramAggregation().Field("created").Interval("year") + dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) + builder = builder.Aggregation("sumBucketDateHisto", dateHisto) + builder = builder.Aggregation("sumBucketSumOfRetweets", NewSumBucketAggregation().BucketsPath("sumBucketDateHisto>sumOfRetweets")) + // MovAvg + dateHisto = NewDateHistogramAggregation().Field("created").Interval("year") + dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets")) + dateHisto = dateHisto.SubAggregation("movingAvg", NewMovAvgAggregation().BucketsPath("sumOfRetweets")) + builder = builder.Aggregation("movingAvgDateHisto", dateHisto) + builder = builder.Aggregation("composite", composite) searchResult, err := builder.Do(context.TODO()) if err != nil { t.Fatal(err) @@ -308,26 +310,24 @@ func TestAggs(t *testing.T) { } // avgRetweetsWithMeta - if esversion >= "2.0" { - avgMetaAggRes, found := agg.Avg("avgRetweetsWithMeta") - if !found { - t.Errorf("expected %v; got: %v", true, found) - } - if avgMetaAggRes == nil { - t.Fatalf("expected != nil; got: nil") - } - if avgMetaAggRes.Meta == nil { - t.Fatalf("expected != nil; got: %v", avgMetaAggRes.Meta) - } - metaDataValue, found := avgMetaAggRes.Meta["meta"] - if !found { - t.Fatalf("expected to return meta data key %q; got: %v", "meta", found) - } - if flag, ok := metaDataValue.(bool); !ok { - t.Fatalf("expected to return meta data key type %T; got: %T", true, metaDataValue) - } else if flag != true { - t.Fatalf("expected to return meta data key value %v; got: %v", true, flag) - } + avgMetaAggRes, found := agg.Avg("avgRetweetsWithMeta") + if !found { + t.Errorf("expected %v; got: %v", true, found) + } + if avgMetaAggRes == nil { + t.Fatalf("expected != nil; got: nil") + } + if avgMetaAggRes.Meta == nil { + t.Fatalf("expected != nil; got: %v", avgMetaAggRes.Meta) + } + metaDataValue, found := avgMetaAggRes.Meta["meta"] + if !found { + t.Fatalf("expected to return meta data key %q; got: %v", "meta", found) + } + if flag, ok := metaDataValue.(bool); !ok { + t.Fatalf("expected to return meta data key type %T; got: %T", true, metaDataValue) + } else if flag != true { + t.Fatalf("expected to return meta data key value %v; got: %v", true, flag) } // minRetweets @@ -817,13 +817,11 @@ func TestAggs(t *testing.T) { if topTags == nil { t.Fatalf("expected != nil; got: nil") } - if esversion >= "1.4.0" { - if topTags.DocCountErrorUpperBound != 0 { - t.Errorf("expected %v; got: %v", 0, topTags.DocCountErrorUpperBound) - } - if topTags.SumOfOtherDocCount != 1 { - t.Errorf("expected %v; got: %v", 1, topTags.SumOfOtherDocCount) - } + if topTags.DocCountErrorUpperBound != 0 { + t.Errorf("expected %v; got: %v", 0, topTags.DocCountErrorUpperBound) + } + if topTags.SumOfOtherDocCount != 1 { + t.Errorf("expected %v; got: %v", 1, topTags.SumOfOtherDocCount) } if len(topTags.Buckets) != 3 { t.Fatalf("expected %d; got: %d", 3, len(topTags.Buckets)) @@ -924,62 +922,71 @@ func TestAggs(t *testing.T) { t.Fatalf("expected != nil; got: nil") } - if esversion >= "1.4" { - // Filters agg "countByUser" (unnamed) - countByUserAggRes, found := agg.Filters("countByUser") - if !found { - t.Errorf("expected %v; got: %v", true, found) - } - if countByUserAggRes == nil { - t.Fatalf("expected != nil; got: nil") - } - if len(countByUserAggRes.Buckets) != 2 { - t.Fatalf("expected %d; got: %d", 2, len(countByUserAggRes.Buckets)) - } - if len(countByUserAggRes.NamedBuckets) != 0 { - t.Fatalf("expected %d; got: %d", 0, len(countByUserAggRes.NamedBuckets)) - } - if countByUserAggRes.Buckets[0].DocCount != 2 { - t.Errorf("expected %d; got: %d", 2, countByUserAggRes.Buckets[0].DocCount) - } - if countByUserAggRes.Buckets[1].DocCount != 1 { - t.Errorf("expected %d; got: %d", 1, countByUserAggRes.Buckets[1].DocCount) - } + // Filters agg "countByUser" (unnamed) + countByUserAggRes, found := agg.Filters("countByUser") + if !found { + t.Errorf("expected %v; got: %v", true, found) + } + if countByUserAggRes == nil { + t.Fatalf("expected != nil; got: nil") + } + if len(countByUserAggRes.Buckets) != 2 { + t.Fatalf("expected %d; got: %d", 2, len(countByUserAggRes.Buckets)) + } + if len(countByUserAggRes.NamedBuckets) != 0 { + t.Fatalf("expected %d; got: %d", 0, len(countByUserAggRes.NamedBuckets)) + } + if countByUserAggRes.Buckets[0].DocCount != 2 { + t.Errorf("expected %d; got: %d", 2, countByUserAggRes.Buckets[0].DocCount) + } + if countByUserAggRes.Buckets[1].DocCount != 1 { + t.Errorf("expected %d; got: %d", 1, countByUserAggRes.Buckets[1].DocCount) + } - // Filters agg "countByUser2" (named) - countByUser2AggRes, found := agg.Filters("countByUser2") - if !found { - t.Errorf("expected %v; got: %v", true, found) - } - if countByUser2AggRes == nil { - t.Fatalf("expected != nil; got: nil") - } - if len(countByUser2AggRes.Buckets) != 0 { - t.Fatalf("expected %d; got: %d", 0, len(countByUser2AggRes.Buckets)) - } - if len(countByUser2AggRes.NamedBuckets) != 2 { - t.Fatalf("expected %d; got: %d", 2, len(countByUser2AggRes.NamedBuckets)) - } - b, found := countByUser2AggRes.NamedBuckets["olivere"] - if !found { - t.Fatalf("expected bucket %q; got: %v", "olivere", found) - } - if b == nil { - t.Fatalf("expected bucket %q; got: %v", "olivere", b) - } - if b.DocCount != 2 { - t.Errorf("expected %d; got: %d", 2, b.DocCount) - } - b, found = countByUser2AggRes.NamedBuckets["sandrae"] - if !found { - t.Fatalf("expected bucket %q; got: %v", "sandrae", found) - } - if b == nil { - t.Fatalf("expected bucket %q; got: %v", "sandrae", b) - } - if b.DocCount != 1 { - t.Errorf("expected %d; got: %d", 1, b.DocCount) - } + // Filters agg "countByUser2" (named) + countByUser2AggRes, found := agg.Filters("countByUser2") + if !found { + t.Errorf("expected %v; got: %v", true, found) + } + if countByUser2AggRes == nil { + t.Fatalf("expected != nil; got: nil") + } + if len(countByUser2AggRes.Buckets) != 0 { + t.Fatalf("expected %d; got: %d", 0, len(countByUser2AggRes.Buckets)) + } + if len(countByUser2AggRes.NamedBuckets) != 2 { + t.Fatalf("expected %d; got: %d", 2, len(countByUser2AggRes.NamedBuckets)) + } + b, found := countByUser2AggRes.NamedBuckets["olivere"] + if !found { + t.Fatalf("expected bucket %q; got: %v", "olivere", found) + } + if b == nil { + t.Fatalf("expected bucket %q; got: %v", "olivere", b) + } + if b.DocCount != 2 { + t.Errorf("expected %d; got: %d", 2, b.DocCount) + } + b, found = countByUser2AggRes.NamedBuckets["sandrae"] + if !found { + t.Fatalf("expected bucket %q; got: %v", "sandrae", found) + } + if b == nil { + t.Fatalf("expected bucket %q; got: %v", "sandrae", b) + } + if b.DocCount != 1 { + t.Errorf("expected %d; got: %d", 1, b.DocCount) + } + + compositeAggRes, found := agg.Composite("composite") + if !found { + t.Errorf("expected %v; got: %v", true, found) + } + if compositeAggRes == nil { + t.Fatalf("expected != nil; got: nil") + } + if want, have := 3, len(compositeAggRes.Buckets); want != have { + t.Fatalf("expected %d; got: %d", want, have) } } @@ -3231,3 +3238,179 @@ func TestAggsPipelineSerialDiff(t *testing.T) { t.Fatalf("expected aggregation value = %v; got: %v", float64(20), *agg.Value) } } + +func TestAggsComposite(t *testing.T) { + s := `{ + "the_composite" : { + "buckets" : [ + { + "key" : { + "composite_users" : "olivere", + "composite_retweets" : 0.0, + "composite_created" : 1349856720000 + }, + "doc_count" : 1 + }, + { + "key" : { + "composite_users" : "olivere", + "composite_retweets" : 108.0, + "composite_created" : 1355333880000 + }, + "doc_count" : 1 + }, + { + "key" : { + "composite_users" : "sandrae", + "composite_retweets" : 12.0, + "composite_created" : 1321009080000 + }, + "doc_count" : 1 + } + ] + } + }` + + aggs := new(Aggregations) + err := json.Unmarshal([]byte(s), &aggs) + if err != nil { + t.Fatalf("expected no error decoding; got: %v", err) + } + + agg, found := aggs.Composite("the_composite") + if !found { + t.Fatalf("expected aggregation to be found; got: %v", found) + } + if agg == nil { + t.Fatalf("expected aggregation != nil; got: %v", agg) + } + if want, have := 3, len(agg.Buckets); want != have { + t.Fatalf("expected aggregation buckets length = %v; got: %v", want, have) + } + + // 1st bucket + bucket := agg.Buckets[0] + if want, have := int64(1), bucket.DocCount; want != have { + t.Fatalf("expected aggregation bucket doc count = %v; got: %v", want, have) + } + if want, have := 3, len(bucket.Key); want != have { + t.Fatalf("expected aggregation bucket key length = %v; got: %v", want, have) + } + v, found := bucket.Key["composite_users"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_users") + } + s, ok := v.(string) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := "olivere", s; want != have { + t.Fatalf("expected to find bucket key value %q; got: %q", want, have) + } + v, found = bucket.Key["composite_retweets"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_retweets") + } + f, ok := v.(float64) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := 0.0, f; want != have { + t.Fatalf("expected to find bucket key value %v; got: %v", want, have) + } + v, found = bucket.Key["composite_created"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_created") + } + f, ok = v.(float64) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := 1349856720000.0, f; want != have { + t.Fatalf("expected to find bucket key value %v; got: %v", want, have) + } + + // 2nd bucket + bucket = agg.Buckets[1] + if want, have := int64(1), bucket.DocCount; want != have { + t.Fatalf("expected aggregation bucket doc count = %v; got: %v", want, have) + } + if want, have := 3, len(bucket.Key); want != have { + t.Fatalf("expected aggregation bucket key length = %v; got: %v", want, have) + } + v, found = bucket.Key["composite_users"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_users") + } + s, ok = v.(string) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := "olivere", s; want != have { + t.Fatalf("expected to find bucket key value %q; got: %q", want, have) + } + v, found = bucket.Key["composite_retweets"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_retweets") + } + f, ok = v.(float64) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := 108.0, f; want != have { + t.Fatalf("expected to find bucket key value %v; got: %v", want, have) + } + v, found = bucket.Key["composite_created"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_created") + } + f, ok = v.(float64) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := 1355333880000.0, f; want != have { + t.Fatalf("expected to find bucket key value %v; got: %v", want, have) + } + + // 3rd bucket + bucket = agg.Buckets[2] + if want, have := int64(1), bucket.DocCount; want != have { + t.Fatalf("expected aggregation bucket doc count = %v; got: %v", want, have) + } + if want, have := 3, len(bucket.Key); want != have { + t.Fatalf("expected aggregation bucket key length = %v; got: %v", want, have) + } + v, found = bucket.Key["composite_users"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_users") + } + s, ok = v.(string) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := "sandrae", s; want != have { + t.Fatalf("expected to find bucket key value %q; got: %q", want, have) + } + v, found = bucket.Key["composite_retweets"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_retweets") + } + f, ok = v.(float64) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := 12.0, f; want != have { + t.Fatalf("expected to find bucket key value %v; got: %v", want, have) + } + v, found = bucket.Key["composite_created"] + if !found { + t.Fatalf("expected to find bucket key %q", "composite_created") + } + f, ok = v.(float64) + if !ok { + t.Fatalf("expected to have bucket key of type string; got: %T", v) + } + if want, have := 1321009080000.0, f; want != have { + t.Fatalf("expected to find bucket key value %v; got: %v", want, have) + } +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set.go b/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set.go new file mode 100644 index 000000000..be410a1a7 --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set.go @@ -0,0 +1,96 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +// TermsSetQuery returns any documents that match with at least +// one or more of the provided terms. The terms are not analyzed +// and thus must match exactly. The number of terms that must +// match varies per document and is either controlled by a +// minimum should match field or computed per document in a +// minimum should match script. +// +// For more details, see +// https://www.elastic.co/guide/en/elasticsearch/reference/6.1/query-dsl-terms-set-query.html +type TermsSetQuery struct { + name string + values []interface{} + minimumShouldMatchField string + minimumShouldMatchScript *Script + queryName string + boost *float64 +} + +// NewTermsSetQuery creates and initializes a new TermsSetQuery. +func NewTermsSetQuery(name string, values ...interface{}) *TermsSetQuery { + q := &TermsSetQuery{ + name: name, + } + if len(values) > 0 { + q.values = append(q.values, values...) + } + return q +} + +// MinimumShouldMatchField specifies the field to match. +func (q *TermsSetQuery) MinimumShouldMatchField(minimumShouldMatchField string) *TermsSetQuery { + q.minimumShouldMatchField = minimumShouldMatchField + return q +} + +// MinimumShouldMatchScript specifies the script to match. +func (q *TermsSetQuery) MinimumShouldMatchScript(minimumShouldMatchScript *Script) *TermsSetQuery { + q.minimumShouldMatchScript = minimumShouldMatchScript + return q +} + +// Boost sets the boost for this query. +func (q *TermsSetQuery) Boost(boost float64) *TermsSetQuery { + q.boost = &boost + return q +} + +// QueryName sets the query name for the filter that can be used +// when searching for matched_filters per hit +func (q *TermsSetQuery) QueryName(queryName string) *TermsSetQuery { + q.queryName = queryName + return q +} + +// Source creates the query source for the term query. +func (q *TermsSetQuery) Source() (interface{}, error) { + // {"terms_set":{"codes":{"terms":["abc","def"],"minimum_should_match_field":"required_matches"}}} + source := make(map[string]interface{}) + inner := make(map[string]interface{}) + params := make(map[string]interface{}) + inner[q.name] = params + source["terms_set"] = inner + + // terms + params["terms"] = q.values + + // minimum_should_match_field + if match := q.minimumShouldMatchField; match != "" { + params["minimum_should_match_field"] = match + } + + // minimum_should_match_script + if match := q.minimumShouldMatchScript; match != nil { + src, err := match.Source() + if err != nil { + return nil, err + } + params["minimum_should_match_script"] = src + } + + // Common parameters for all queries + if q.boost != nil { + params["boost"] = *q.boost + } + if q.queryName != "" { + params["_name"] = q.queryName + } + + return source, nil +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set_test.go new file mode 100644 index 000000000..e13fbfb2f --- /dev/null +++ b/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set_test.go @@ -0,0 +1,75 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "context" + "encoding/json" + "testing" +) + +func TestTermsSetQueryWithField(t *testing.T) { + q := NewTermsSetQuery("codes", "abc", "def", "ghi").MinimumShouldMatchField("required_matches") + src, err := q.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"terms_set":{"codes":{"minimum_should_match_field":"required_matches","terms":["abc","def","ghi"]}}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} + +func TestTermsSetQueryWithScript(t *testing.T) { + q := NewTermsSetQuery("codes", "abc", "def", "ghi"). + MinimumShouldMatchScript( + NewScript(`Math.min(params.num_terms, doc['required_matches'].value)`), + ) + src, err := q.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"terms_set":{"codes":{"minimum_should_match_script":{"source":"Math.min(params.num_terms, doc['required_matches'].value)"},"terms":["abc","def","ghi"]}}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} + +func TestSearchTermsSetQuery(t *testing.T) { + //client := setupTestClientAndCreateIndexAndAddDocs(t, SetTraceLog(log.New(os.Stdout, "", log.LstdFlags))) + client := setupTestClientAndCreateIndexAndAddDocs(t) + + // Match all should return all documents + searchResult, err := client.Search(). + Index(testIndexName). + Query( + NewTermsSetQuery("user", "olivere", "sandrae"). + MinimumShouldMatchField("retweets"), + ). + Pretty(true). + Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + if searchResult.Hits == nil { + t.Errorf("expected SearchResult.Hits != nil; got nil") + } + if got, want := searchResult.Hits.TotalHits, int64(3); got != want { + t.Errorf("expected SearchResult.Hits.TotalHits = %d; got %d", want, got) + } + if got, want := len(searchResult.Hits.Hits), 3; got != want { + t.Errorf("expected len(SearchResult.Hits.Hits) = %d; got %d", want, got) + } +} diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_request.go b/vendor/gopkg.in/olivere/elastic.v5/search_request.go index 6f40ff028..7ee4ce82c 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/search_request.go +++ b/vendor/gopkg.in/olivere/elastic.v5/search_request.go @@ -4,7 +4,10 @@ package elastic -import "strings" +import ( + "encoding/json" + "strings" +) // SearchRequest combines a search request and its // query details (see SearchSource). @@ -130,17 +133,7 @@ func (r *SearchRequest) SearchSource(searchSource *SearchSource) *SearchRequest } func (r *SearchRequest) Source(source interface{}) *SearchRequest { - switch v := source.(type) { - case *SearchSource: - src, err := v.Source() - if err != nil { - // Do not do anything in case of an error - return r - } - r.source = src - default: - r.source = source - } + r.source = source return r } @@ -200,6 +193,34 @@ func (r *SearchRequest) header() interface{} { // Body is used e.g. by MultiSearch to get information about the search body // of one SearchRequest. // See https://www.elastic.co/guide/en/elasticsearch/reference/6.0/search-multi-search.html -func (r *SearchRequest) Body() interface{} { - return r.source +func (r *SearchRequest) Body() (string, error) { + switch t := r.source.(type) { + default: + body, err := json.Marshal(r.source) + if err != nil { + return "", err + } + return string(body), nil + case *SearchSource: + src, err := t.Source() + if err != nil { + return "", err + } + body, err := json.Marshal(src) + if err != nil { + return "", err + } + return string(body), nil + case json.RawMessage: + return string(t), nil + case *json.RawMessage: + return string(*t), nil + case string: + return t, nil + case *string: + if t != nil { + return *t, nil + } + return "{}", nil + } } diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_test.go index 097c26525..586089aaa 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/search_test.go +++ b/vendor/gopkg.in/olivere/elastic.v5/search_test.go @@ -607,6 +607,61 @@ func TestSearchSource(t *testing.T) { } } +func TestSearchSourceWithString(t *testing.T) { + client := setupTestClientAndCreateIndex(t) + + tweet1 := tweet{ + User: "olivere", Retweets: 108, + Message: "Welcome to Golang and Elasticsearch.", + Created: time.Date(2012, 12, 12, 17, 38, 34, 0, time.UTC), + } + tweet2 := tweet{ + User: "olivere", Retweets: 0, + Message: "Another unrelated topic.", + Created: time.Date(2012, 10, 10, 8, 12, 03, 0, time.UTC), + } + tweet3 := tweet{ + User: "sandrae", Retweets: 12, + Message: "Cycling is fun.", + Created: time.Date(2011, 11, 11, 10, 58, 12, 0, time.UTC), + } + + // Add all documents + _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + + _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + + _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + + _, err = client.Flush().Index(testIndexName).Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + + searchResult, err := client.Search(). + Index(testIndexName). + Source(`{"query":{"match_all":{}}}`). // sets the JSON request + Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + if searchResult.Hits == nil { + t.Errorf("expected SearchResult.Hits != nil; got nil") + } + if searchResult.Hits.TotalHits != 3 { + t.Errorf("expected SearchResult.Hits.TotalHits = %d; got %d", 3, searchResult.Hits.TotalHits) + } +} + func TestSearchRawString(t *testing.T) { // client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) client := setupTestClientAndCreateIndex(t) -- cgit v1.2.3-1-g7c22