summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/olivere/elastic.v5
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5')
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/.travis.yml2
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS11
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/README.md16
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go61
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/client.go8
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/errors.go8
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/msearch.go39
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/msearch_test.go105
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go149
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/reindex.go10
-rwxr-xr-xvendor/gopkg.in/olivere/elastic.v5/run-es.sh4
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_aggs.go70
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite.go498
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite_test.go92
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go9
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go4
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go441
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set.go96
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set_test.go75
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_request.go49
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/search_test.go55
21 files changed, 1621 insertions, 181 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/.travis.yml b/vendor/gopkg.in/olivere/elastic.v5/.travis.yml
index b4322c13c..9658f873a 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/.travis.yml
+++ b/vendor/gopkg.in/olivere/elastic.v5/.travis.yml
@@ -12,4 +12,4 @@ services:
- docker
before_install:
- sudo sysctl -w vm.max_map_count=262144
- - docker run -d --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch:6.1.2 elasticsearch -Expack.security.enabled=false -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_
+ - docker run -d --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.1 elasticsearch -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_
diff --git a/vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS b/vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS
index d7f7f780f..ba06dac29 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS
+++ b/vendor/gopkg.in/olivere/elastic.v5/CONTRIBUTORS
@@ -68,9 +68,11 @@ Joe Buck [@four2five](https://github.com/four2five)
John Barker [@j16r](https://github.com/j16r)
John Goodall [@jgoodall](https://github.com/jgoodall)
John Stanford [@jxstanford](https://github.com/jxstanford)
+Jonas Groenaas Drange [@semafor](https://github.com/semafor)
Josh Chorlton [@jchorl](https://github.com/jchorl)
jun [@coseyo](https://github.com/coseyo)
Junpei Tsuji [@jun06t](https://github.com/jun06t)
+kartlee [@kartlee](https://github.com/kartlee)
Keith Hatton [@khatton-ft](https://github.com/khatton-ft)
kel [@liketic](https://github.com/liketic)
Kenta SUZUKI [@suzuken](https://github.com/suzuken)
@@ -98,10 +100,13 @@ Orne Brocaar [@brocaar](https://github.com/brocaar)
Paul [@eyeamera](https://github.com/eyeamera)
Pete C [@peteclark-ft](https://github.com/peteclark-ft)
Radoslaw Wesolowski [r--w](https://github.com/r--w)
+Roman Colohanin [@zuzmic](https://github.com/zuzmic)
Ryan Schmukler [@rschmukler](https://github.com/rschmukler)
+Ryan Wynn [@rwynn](https://github.com/rwynn)
Sacheendra talluri [@sacheendra](https://github.com/sacheendra)
Sean DuBois [@Sean-Der](https://github.com/Sean-Der)
Shalin LK [@shalinlk](https://github.com/shalinlk)
+singham [@zhaochenxiao90](https://github.com/zhaochenxiao90)
Stephen Kubovic [@stephenkubovic](https://github.com/stephenkubovic)
Stuart Warren [@Woz](https://github.com/stuart-warren)
Sulaiman [@salajlan](https://github.com/salajlan)
@@ -111,13 +116,13 @@ Take [ww24](https://github.com/ww24)
Tetsuya Morimoto [@t2y](https://github.com/t2y)
TimeEmit [@TimeEmit](https://github.com/timeemit)
TusharM [@tusharm](https://github.com/tusharm)
-zhangxin [@visaxin](https://github.com/visaxin)
wangtuo [@wangtuo](https://github.com/wangtuo)
Wédney Yuri [@wedneyyuri](https://github.com/wedneyyuri)
wolfkdy [@wolfkdy](https://github.com/wolfkdy)
Wyndham Blanton [@wyndhblb](https://github.com/wyndhblb)
Yarden Bar [@ayashjorden](https://github.com/ayashjorden)
zakthomas [@zakthomas](https://github.com/zakthomas)
-singham [@zhaochenxiao90](https://github.com/zhaochenxiao90)
+Yuya Kusakabe [@higebu](https://github.com/higebu)
+Zach [@snowzach](https://github.com/snowzach)
+zhangxin [@visaxin](https://github.com/visaxin)
@林 [@zplzpl](https://github.com/zplzpl)
-Roman Colohanin [@zuzmic](https://github.com/zuzmic)
diff --git a/vendor/gopkg.in/olivere/elastic.v5/README.md b/vendor/gopkg.in/olivere/elastic.v5/README.md
index f452b664d..d0cdd7821 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/README.md
+++ b/vendor/gopkg.in/olivere/elastic.v5/README.md
@@ -199,6 +199,7 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details.
- [x] Significant Terms
- [x] Significant Text
- [x] Terms
+ - [x] Composite
- Pipeline Aggregations
- [x] Avg Bucket
- [x] Derivative
@@ -212,6 +213,7 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details.
- [x] Cumulative Sum
- [x] Bucket Script
- [x] Bucket Selector
+ - [ ] Bucket Sort
- [x] Serial Differencing
- [x] Matrix Aggregations
- [x] Matrix Stats
@@ -234,17 +236,17 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details.
- [x] Update Indices Settings
- [x] Get Settings
- [x] Analyze
+ - [x] Explain Analyze
- [x] Index Templates
-- [ ] Shadow Replica Indices
- [x] Indices Stats
- [x] Indices Segments
- [ ] Indices Recovery
- [ ] Indices Shard Stores
- [ ] Clear Cache
- [x] Flush
+ - [x] Synced Flush
- [x] Refresh
- [x] Force Merge
-- [ ] Upgrade
### cat APIs
@@ -267,6 +269,7 @@ The cat APIs are not implemented as of now. We think they are better suited for
- [ ] cat shards
- [ ] cat segments
- [ ] cat snapshots
+- [ ] cat templates
### Cluster APIs
@@ -278,6 +281,8 @@ The cat APIs are not implemented as of now. We think they are better suited for
- [ ] Cluster Update Settings
- [x] Nodes Stats
- [x] Nodes Info
+- [ ] Nodes Feature Usage
+- [ ] Remote Cluster Info
- [x] Task Management API
- [ ] Nodes hot_threads
- [ ] Cluster Allocation Explain API
@@ -297,6 +302,7 @@ The cat APIs are not implemented as of now. We think they are better suited for
- Term level queries
- [x] Term Query
- [x] Terms Query
+ - [x] Terms Set Query
- [x] Range Query
- [x] Exists Query
- [x] Prefix Query
@@ -311,7 +317,6 @@ The cat APIs are not implemented as of now. We think they are better suited for
- [x] Dis Max Query
- [x] Function Score Query
- [x] Boosting Query
- - [x] Indices Query
- Joining queries
- [x] Nested Query
- [x] Has Child Query
@@ -321,12 +326,9 @@ The cat APIs are not implemented as of now. We think they are better suited for
- [ ] GeoShape Query
- [x] Geo Bounding Box Query
- [x] Geo Distance Query
- - [ ] Geo Distance Range Query
- [x] Geo Polygon Query
- - [ ] Geohash Cell Query
- Specialized queries
- [x] More Like This Query
- - [x] Template Query
- [x] Script Query
- [x] Percolate Query
- Span queries
@@ -346,7 +348,7 @@ The cat APIs are not implemented as of now. We think they are better suited for
- Snapshot and Restore
- [x] Repositories
- - [ ] Snapshot
+ - [x] Snapshot
- [ ] Restore
- [ ] Snapshot status
- [ ] Monitoring snapshot/restore status
diff --git a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
index b2709a880..6ee8a3dee 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/bulk_processor.go
@@ -6,6 +6,7 @@ package elastic
import (
"context"
+ "net"
"sync"
"sync/atomic"
"time"
@@ -121,7 +122,7 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
return s
}
-// Set the backoff strategy to use for errors
+// Backoff sets the backoff strategy to use for errors.
func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService {
s.backoff = backoff
return s
@@ -248,6 +249,8 @@ type BulkProcessor struct {
statsMu sync.Mutex // guards the following block
stats *BulkProcessorStats
+
+ stopReconnC chan struct{} // channel to signal stop reconnection attempts
}
func newBulkProcessor(
@@ -293,6 +296,7 @@ func (p *BulkProcessor) Start(ctx context.Context) error {
p.requestsC = make(chan BulkableRequest)
p.executionId = 0
p.stats = newBulkProcessorStats(p.numWorkers)
+ p.stopReconnC = make(chan struct{})
// Create and start up workers.
p.workers = make([]*bulkWorker, p.numWorkers)
@@ -331,6 +335,12 @@ func (p *BulkProcessor) Close() error {
return nil
}
+ // Tell connection checkers to stop
+ if p.stopReconnC != nil {
+ close(p.stopReconnC)
+ p.stopReconnC = nil
+ }
+
// Stop flusher (if enabled)
if p.flusherStopC != nil {
p.flusherStopC <- struct{}{}
@@ -436,29 +446,43 @@ func (w *bulkWorker) work(ctx context.Context) {
var stop bool
for !stop {
+ var err error
select {
case req, open := <-w.p.requestsC:
if open {
// Received a new request
w.service.Add(req)
if w.commitRequired() {
- w.commit(ctx) // TODO swallow errors here?
+ err = w.commit(ctx)
}
} else {
// Channel closed: Stop.
stop = true
if w.service.NumberOfActions() > 0 {
- w.commit(ctx) // TODO swallow errors here?
+ err = w.commit(ctx)
}
}
case <-w.flushC:
// Commit outstanding requests
if w.service.NumberOfActions() > 0 {
- w.commit(ctx) // TODO swallow errors here?
+ err = w.commit(ctx)
}
w.flushAckC <- struct{}{}
}
+ if !stop && err != nil {
+ waitForActive := func() {
+ // Add back pressure to prevent Add calls from filling up the request queue
+ ready := make(chan struct{})
+ go w.waitForActiveConnection(ready)
+ <-ready
+ }
+ if _, ok := err.(net.Error); ok {
+ waitForActive()
+ } else if IsConnErr(err) {
+ waitForActive()
+ }
+ }
}
}
@@ -511,6 +535,35 @@ func (w *bulkWorker) commit(ctx context.Context) error {
return err
}
+func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {
+ defer close(ready)
+
+ t := time.NewTicker(5 * time.Second)
+ defer t.Stop()
+
+ client := w.p.c
+ stopReconnC := w.p.stopReconnC
+ w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name)
+
+ // loop until a health check finds at least 1 active connection or the reconnection channel is closed
+ for {
+ select {
+ case _, ok := <-stopReconnC:
+ if !ok {
+ w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name)
+ return
+ }
+ case <-t.C:
+ client.healthcheck(time.Duration(3)*time.Second, true)
+ if client.mustActiveConn() == nil {
+ // found an active connection
+ // exit and signal done to the WaitGroup
+ return
+ }
+ }
+ }
+}
+
func (w *bulkWorker) updateStats(res *BulkResponse) {
// Update stats
if res != nil {
diff --git a/vendor/gopkg.in/olivere/elastic.v5/client.go b/vendor/gopkg.in/olivere/elastic.v5/client.go
index 1eb0ec54f..165a30526 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/client.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/client.go
@@ -26,7 +26,7 @@ import (
const (
// Version is the current version of Elastic.
- Version = "6.1.4"
+ Version = "6.1.7"
// DefaultURL is the default endpoint of Elasticsearch on the local machine.
// It is used e.g. when initializing a new Client without a specific URL.
@@ -1778,9 +1778,3 @@ func (c *Client) WaitForGreenStatus(timeout string) error {
func (c *Client) WaitForYellowStatus(timeout string) error {
return c.WaitForStatus("yellow", timeout)
}
-
-// IsConnError unwraps the given error value and checks if it is equal to
-// elastic.ErrNoClient.
-func IsConnErr(err error) bool {
- return errors.Cause(err) == ErrNoClient
-}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/errors.go b/vendor/gopkg.in/olivere/elastic.v5/errors.go
index 00a936621..e40cda845 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/errors.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/errors.go
@@ -9,6 +9,8 @@ import (
"fmt"
"io/ioutil"
"net/http"
+
+ "github.com/pkg/errors"
)
// checkResponse will return an error if the request/response indicates
@@ -89,6 +91,12 @@ func (e *Error) Error() string {
}
}
+// IsConnErr returns true if the error indicates that Elastic could not
+// find an Elasticsearch host to connect to.
+func IsConnErr(err error) bool {
+ return err == ErrNoClient || errors.Cause(err) == ErrNoClient
+}
+
// IsNotFound returns true if the given error indicates that Elasticsearch
// returned HTTP status 404. The err parameter can be of type *elastic.Error,
// elastic.Error, *http.Response or int (indicating the HTTP status code).
diff --git a/vendor/gopkg.in/olivere/elastic.v5/msearch.go b/vendor/gopkg.in/olivere/elastic.v5/msearch.go
index ed54d3c2f..c1a589a97 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/msearch.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/msearch.go
@@ -14,19 +14,17 @@ import (
// MultiSearch executes one or more searches in one roundtrip.
type MultiSearchService struct {
- client *Client
- requests []*SearchRequest
- indices []string
- pretty bool
- routing string
- preference string
+ client *Client
+ requests []*SearchRequest
+ indices []string
+ pretty bool
+ maxConcurrentRequests *int
+ preFilterShardSize *int
}
func NewMultiSearchService(client *Client) *MultiSearchService {
builder := &MultiSearchService{
- client: client,
- requests: make([]*SearchRequest, 0),
- indices: make([]string, 0),
+ client: client,
}
return builder
}
@@ -46,6 +44,16 @@ func (s *MultiSearchService) Pretty(pretty bool) *MultiSearchService {
return s
}
+func (s *MultiSearchService) MaxConcurrentSearches(max int) *MultiSearchService {
+ s.maxConcurrentRequests = &max
+ return s
+}
+
+func (s *MultiSearchService) PreFilterShardSize(size int) *MultiSearchService {
+ s.preFilterShardSize = &size
+ return s
+}
+
func (s *MultiSearchService) Do(ctx context.Context) (*MultiSearchResult, error) {
// Build url
path := "/_msearch"
@@ -55,6 +63,12 @@ func (s *MultiSearchService) Do(ctx context.Context) (*MultiSearchResult, error)
if s.pretty {
params.Set("pretty", fmt.Sprintf("%v", s.pretty))
}
+ if v := s.maxConcurrentRequests; v != nil {
+ params.Set("max_concurrent_searches", fmt.Sprintf("%v", *v))
+ }
+ if v := s.preFilterShardSize; v != nil {
+ params.Set("pre_filter_shard_size", fmt.Sprintf("%v", *v))
+ }
// Set body
var lines []string
@@ -68,14 +82,14 @@ func (s *MultiSearchService) Do(ctx context.Context) (*MultiSearchResult, error)
if err != nil {
return nil, err
}
- body, err := json.Marshal(sr.Body())
+ body, err := sr.Body()
if err != nil {
return nil, err
}
lines = append(lines, string(header))
- lines = append(lines, string(body))
+ lines = append(lines, body)
}
- body := strings.Join(lines, "\n") + "\n" // Don't forget trailing \n
+ body := strings.Join(lines, "\n") + "\n" // add trailing \n
// Get response
res, err := s.client.PerformRequest(ctx, PerformRequestOptions{
@@ -96,6 +110,7 @@ func (s *MultiSearchService) Do(ctx context.Context) (*MultiSearchResult, error)
return ret, nil
}
+// MultiSearchResult is the outcome of running a multi-search operation.
type MultiSearchResult struct {
Responses []*SearchResult `json:"responses,omitempty"`
}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/msearch_test.go b/vendor/gopkg.in/olivere/elastic.v5/msearch_test.go
index 79f2047e6..d25e2cc28 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/msearch_test.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/msearch_test.go
@@ -13,6 +13,7 @@ import (
func TestMultiSearch(t *testing.T) {
client := setupTestClientAndCreateIndex(t)
+ // client := setupTestClientAndCreateIndexAndLog(t)
tweet1 := tweet{
User: "olivere",
@@ -62,6 +63,110 @@ func TestMultiSearch(t *testing.T) {
searchResult, err := client.MultiSearch().
Add(sreq1, sreq2).
+ Pretty(true).
+ Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+ if searchResult.Responses == nil {
+ t.Fatal("expected responses != nil; got nil")
+ }
+ if len(searchResult.Responses) != 2 {
+ t.Fatalf("expected 2 responses; got %d", len(searchResult.Responses))
+ }
+
+ sres := searchResult.Responses[0]
+ if sres.Hits == nil {
+ t.Errorf("expected Hits != nil; got nil")
+ }
+ if sres.Hits.TotalHits != 3 {
+ t.Errorf("expected Hits.TotalHits = %d; got %d", 3, sres.Hits.TotalHits)
+ }
+ if len(sres.Hits.Hits) != 3 {
+ t.Errorf("expected len(Hits.Hits) = %d; got %d", 3, len(sres.Hits.Hits))
+ }
+ for _, hit := range sres.Hits.Hits {
+ if hit.Index != testIndexName {
+ t.Errorf("expected Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
+ }
+ item := make(map[string]interface{})
+ err := json.Unmarshal(*hit.Source, &item)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ sres = searchResult.Responses[1]
+ if sres.Hits == nil {
+ t.Errorf("expected Hits != nil; got nil")
+ }
+ if sres.Hits.TotalHits != 2 {
+ t.Errorf("expected Hits.TotalHits = %d; got %d", 2, sres.Hits.TotalHits)
+ }
+ if len(sres.Hits.Hits) != 2 {
+ t.Errorf("expected len(Hits.Hits) = %d; got %d", 2, len(sres.Hits.Hits))
+ }
+ for _, hit := range sres.Hits.Hits {
+ if hit.Index != testIndexName {
+ t.Errorf("expected Hits.Hit.Index = %q; got %q", testIndexName, hit.Index)
+ }
+ item := make(map[string]interface{})
+ err := json.Unmarshal(*hit.Source, &item)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+func TestMultiSearchWithStrings(t *testing.T) {
+ client := setupTestClientAndCreateIndex(t)
+ // client := setupTestClientAndCreateIndexAndLog(t)
+
+ tweet1 := tweet{
+ User: "olivere",
+ Message: "Welcome to Golang and Elasticsearch.",
+ Tags: []string{"golang", "elasticsearch"},
+ }
+ tweet2 := tweet{
+ User: "olivere",
+ Message: "Another unrelated topic.",
+ Tags: []string{"golang"},
+ }
+ tweet3 := tweet{
+ User: "sandrae",
+ Message: "Cycling is fun.",
+ Tags: []string{"sports", "cycling"},
+ }
+
+ // Add all documents
+ _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = client.Flush().Index(testIndexName).Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Spawn two search queries with one roundtrip
+ sreq1 := NewSearchRequest().Index(testIndexName, testIndexName2).
+ Source(`{"query":{"match_all":{}}}`)
+ sreq2 := NewSearchRequest().Index(testIndexName).Type("doc").
+ Source(`{"query":{"term":{"tags":"golang"}}}`)
+
+ searchResult, err := client.MultiSearch().
+ Add(sreq1, sreq2).
Do(context.TODO())
if err != nil {
t.Fatal(err)
diff --git a/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go
new file mode 100644
index 000000000..f13243297
--- /dev/null
+++ b/vendor/gopkg.in/olivere/elastic.v5/recipes/bulk_processor/main.go
@@ -0,0 +1,149 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See http://olivere.mit-license.org/license.txt for details.
+
+// BulkProcessor runs a bulk processing job that fills an index
+// given certain criteria like flush interval etc.
+//
+// Example
+//
+// bulk_processor -url=http://127.0.0.1:9200/bulk-processor-test?sniff=false -n=100000 -flush-interval=1s
+//
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "math/rand"
+ "os"
+ "os/signal"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ "github.com/google/uuid"
+
+ "github.com/olivere/elastic"
+ "github.com/olivere/elastic/config"
+)
+
+func main() {
+ var (
+ url = flag.String("url", "http://localhost:9200/bulk-processor-test", "Elasticsearch URL")
+ numWorkers = flag.Int("num-workers", 4, "Number of workers")
+ n = flag.Int64("n", -1, "Number of documents to process (-1 for unlimited)")
+ flushInterval = flag.Duration("flush-interval", 1*time.Second, "Flush interval")
+ bulkActions = flag.Int("bulk-actions", 0, "Number of bulk actions before committing")
+ bulkSize = flag.Int("bulk-size", 0, "Size of bulk requests before committing")
+ )
+ flag.Parse()
+ log.SetFlags(0)
+
+ rand.Seed(time.Now().UnixNano())
+
+ // Parse configuration from URL
+ cfg, err := config.Parse(*url)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Create an Elasticsearch client from the parsed config
+ client, err := elastic.NewClientFromConfig(cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Drop old index
+ exists, err := client.IndexExists(cfg.Index).Do(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+ if exists {
+ _, err = client.DeleteIndex(cfg.Index).Do(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ // Create processor
+ bulkp := elastic.NewBulkProcessorService(client).
+ Name("bulk-test-processor").
+ Stats(true).
+ Backoff(elastic.StopBackoff{}).
+ FlushInterval(*flushInterval).
+ Workers(*numWorkers)
+ if *bulkActions > 0 {
+ bulkp = bulkp.BulkActions(*bulkActions)
+ }
+ if *bulkSize > 0 {
+ bulkp = bulkp.BulkSize(*bulkSize)
+ }
+ p, err := bulkp.Do(context.Background())
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ var created int64
+ errc := make(chan error, 1)
+ go func() {
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
+ <-c
+ errc <- nil
+ }()
+
+ go func() {
+ defer func() {
+ if err := p.Close(); err != nil {
+ errc <- err
+ }
+ }()
+
+ type Doc struct {
+ Timestamp time.Time `json:"@timestamp"`
+ }
+
+ for {
+ current := atomic.AddInt64(&created, 1)
+ if *n > 0 && current >= *n {
+ errc <- nil
+ return
+ }
+ r := elastic.NewBulkIndexRequest().
+ Index(cfg.Index).
+ Type("doc").
+ Id(uuid.New().String()).
+ Doc(Doc{Timestamp: time.Now()})
+ p.Add(r)
+
+ time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
+ }
+ }()
+
+ go func() {
+ t := time.NewTicker(1 * time.Second)
+ defer t.Stop()
+ for range t.C {
+ stats := p.Stats()
+ written := atomic.LoadInt64(&created)
+ var queued int64
+ for _, w := range stats.Workers {
+ queued += w.Queued
+ }
+ fmt.Printf("Queued=%5d Written=%8d Succeeded=%8d Failed=%8d Comitted=%6d Flushed=%6d\n",
+ queued,
+ written,
+ stats.Succeeded,
+ stats.Failed,
+ stats.Committed,
+ stats.Flushed,
+ )
+ }
+ }()
+
+ if err := <-errc; err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/reindex.go b/vendor/gopkg.in/olivere/elastic.v5/reindex.go
index 35440fa80..9cdd50a68 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/reindex.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/reindex.go
@@ -20,6 +20,7 @@ type ReindexService struct {
waitForActiveShards string
waitForCompletion *bool
requestsPerSecond *int
+ slices *int
body interface{}
source *ReindexSource
destination *ReindexDestination
@@ -51,6 +52,12 @@ func (s *ReindexService) RequestsPerSecond(requestsPerSecond int) *ReindexServic
return s
}
+// Slices specifies the number of slices this task should be divided into. Defaults to 1.
+func (s *ReindexService) Slices(slices int) *ReindexService {
+ s.slices = &slices
+ return s
+}
+
// Refresh indicates whether Elasticsearch should refresh the effected indexes
// immediately.
func (s *ReindexService) Refresh(refresh string) *ReindexService {
@@ -179,6 +186,9 @@ func (s *ReindexService) buildURL() (string, url.Values, error) {
if s.requestsPerSecond != nil {
params.Set("requests_per_second", fmt.Sprintf("%v", *s.requestsPerSecond))
}
+ if s.slices != nil {
+ params.Set("slices", fmt.Sprintf("%v", *s.slices))
+ }
if s.waitForActiveShards != "" {
params.Set("wait_for_active_shards", s.waitForActiveShards)
}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/run-es.sh b/vendor/gopkg.in/olivere/elastic.v5/run-es.sh
index 1f4a851d4..624a864ed 100755
--- a/vendor/gopkg.in/olivere/elastic.v5/run-es.sh
+++ b/vendor/gopkg.in/olivere/elastic.v5/run-es.sh
@@ -1,3 +1,3 @@
#!/bin/sh
-VERSION=${VERSION:=6.1.2}
-docker run --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch:$VERSION elasticsearch -Expack.security.enabled=false -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_
+VERSION=${VERSION:=6.2.1}
+docker run --rm -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" -e "bootstrap.memory_lock=true" -e "ES_JAVA_OPTS=-Xms1g -Xmx1g" docker.elastic.co/elasticsearch/elasticsearch-oss:$VERSION elasticsearch -Enetwork.host=_local_,_site_ -Enetwork.publish_host=_local_
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs.go
index c5082b2b1..6359611b1 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/search_aggs.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs.go
@@ -653,6 +653,23 @@ func (a Aggregations) SerialDiff(name string) (*AggregationPipelineSimpleValue,
return nil, false
}
+// Composite returns composite bucket aggregation results.
+//
+// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html
+// for details.
+func (a Aggregations) Composite(name string) (*AggregationBucketCompositeItems, bool) {
+ if raw, found := a[name]; found {
+ agg := new(AggregationBucketCompositeItems)
+ if raw == nil {
+ return agg, true
+ }
+ if err := json.Unmarshal(*raw, agg); err == nil {
+ return agg, true
+ }
+ }
+ return nil, false
+}
+
// -- Single value metric --
// AggregationValueMetric is a single-value metric, returned e.g. by a
@@ -1448,3 +1465,56 @@ func (a *AggregationPipelinePercentilesMetric) UnmarshalJSON(data []byte) error
a.Aggregations = aggs
return nil
}
+
+// -- Composite key items --
+
+// AggregationBucketCompositeItems implements the response structure
+// for a bucket aggregation of type composite.
+type AggregationBucketCompositeItems struct {
+ Aggregations
+
+ Buckets []*AggregationBucketCompositeItem //`json:"buckets"`
+ Meta map[string]interface{} // `json:"meta,omitempty"`
+}
+
+// UnmarshalJSON decodes JSON data and initializes an AggregationBucketCompositeItems structure.
+func (a *AggregationBucketCompositeItems) UnmarshalJSON(data []byte) error {
+ var aggs map[string]*json.RawMessage
+ if err := json.Unmarshal(data, &aggs); err != nil {
+ return err
+ }
+ if v, ok := aggs["buckets"]; ok && v != nil {
+ json.Unmarshal(*v, &a.Buckets)
+ }
+ if v, ok := aggs["meta"]; ok && v != nil {
+ json.Unmarshal(*v, &a.Meta)
+ }
+ a.Aggregations = aggs
+ return nil
+}
+
+// AggregationBucketCompositeItem is a single bucket of an AggregationBucketCompositeItems structure.
+type AggregationBucketCompositeItem struct {
+ Aggregations
+
+ Key map[string]interface{} //`json:"key"`
+ DocCount int64 //`json:"doc_count"`
+}
+
+// UnmarshalJSON decodes JSON data and initializes an AggregationBucketCompositeItem structure.
+func (a *AggregationBucketCompositeItem) UnmarshalJSON(data []byte) error {
+ var aggs map[string]*json.RawMessage
+ dec := json.NewDecoder(bytes.NewReader(data))
+ dec.UseNumber()
+ if err := dec.Decode(&aggs); err != nil {
+ return err
+ }
+ if v, ok := aggs["key"]; ok && v != nil {
+ json.Unmarshal(*v, &a.Key)
+ }
+ if v, ok := aggs["doc_count"]; ok && v != nil {
+ json.Unmarshal(*v, &a.DocCount)
+ }
+ a.Aggregations = aggs
+ return nil
+}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite.go
new file mode 100644
index 000000000..1d9132d2d
--- /dev/null
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite.go
@@ -0,0 +1,498 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See http://olivere.mit-license.org/license.txt for details.
+
+package elastic
+
+// CompositeAggregation is a multi-bucket values source based aggregation
+// that can be used to calculate unique composite values from source documents.
+//
+// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html
+// for details.
+type CompositeAggregation struct {
+ after map[string]interface{}
+ size *int
+ sources []CompositeAggregationValuesSource
+ subAggregations map[string]Aggregation
+ meta map[string]interface{}
+}
+
+// NewCompositeAggregation creates a new CompositeAggregation.
+func NewCompositeAggregation() *CompositeAggregation {
+ return &CompositeAggregation{
+ sources: make([]CompositeAggregationValuesSource, 0),
+ subAggregations: make(map[string]Aggregation),
+ }
+}
+
+// Size represents the number of composite buckets to return.
+// Defaults to 10 as of Elasticsearch 6.1.
+func (a *CompositeAggregation) Size(size int) *CompositeAggregation {
+ a.size = &size
+ return a
+}
+
+// AggregateAfter sets the values that indicate which composite bucket this
+// request should "aggregate after".
+func (a *CompositeAggregation) AggregateAfter(after map[string]interface{}) *CompositeAggregation {
+ a.after = after
+ return a
+}
+
+// Sources specifies the list of CompositeAggregationValuesSource instances to
+// use in the aggregation.
+func (a *CompositeAggregation) Sources(sources ...CompositeAggregationValuesSource) *CompositeAggregation {
+ a.sources = append(a.sources, sources...)
+ return a
+}
+
+// SubAggregations of this aggregation.
+func (a *CompositeAggregation) SubAggregation(name string, subAggregation Aggregation) *CompositeAggregation {
+ a.subAggregations[name] = subAggregation
+ return a
+}
+
+// Meta sets the meta data to be included in the aggregation response.
+func (a *CompositeAggregation) Meta(metaData map[string]interface{}) *CompositeAggregation {
+ a.meta = metaData
+ return a
+}
+
+// Source returns the serializable JSON for this aggregation.
+func (a *CompositeAggregation) Source() (interface{}, error) {
+ // Example:
+ // {
+ // "aggs" : {
+ // "my_composite_agg" : {
+ // "composite" : {
+ // "sources": [
+ // {"my_term": { "terms": { "field": "product" }}},
+ // {"my_histo": { "histogram": { "field": "price", "interval": 5 }}},
+ // {"my_date": { "date_histogram": { "field": "timestamp", "interval": "1d" }}},
+ // ],
+ // "size" : 10,
+ // "after" : ["a", 2, "c"]
+ // }
+ // }
+ // }
+ // }
+ //
+ // This method returns only the { "histogram" : { ... } } part.
+
+ source := make(map[string]interface{})
+ opts := make(map[string]interface{})
+ source["composite"] = opts
+
+ sources := make([]interface{}, len(a.sources))
+ for i, s := range a.sources {
+ src, err := s.Source()
+ if err != nil {
+ return nil, err
+ }
+ sources[i] = src
+ }
+ opts["sources"] = sources
+
+ if a.size != nil {
+ opts["size"] = *a.size
+ }
+
+ if a.after != nil {
+ opts["after"] = a.after
+ }
+
+ // AggregationBuilder (SubAggregations)
+ if len(a.subAggregations) > 0 {
+ aggsMap := make(map[string]interface{})
+ source["aggregations"] = aggsMap
+ for name, aggregate := range a.subAggregations {
+ src, err := aggregate.Source()
+ if err != nil {
+ return nil, err
+ }
+ aggsMap[name] = src
+ }
+ }
+
+ // Add Meta data if available
+ if len(a.meta) > 0 {
+ source["meta"] = a.meta
+ }
+
+ return source, nil
+}
+
+// -- Generic interface for CompositeAggregationValues --
+
+// CompositeAggregationValuesSource specifies the interface that
+// all implementations for CompositeAggregation's Sources method
+// need to implement.
+//
+// The different implementations are described in
+// https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html#_values_source_2.
+type CompositeAggregationValuesSource interface {
+ Source() (interface{}, error)
+}
+
+// -- CompositeAggregationTermsValuesSource --
+
+// CompositeAggregationTermsValuesSource is a source for the CompositeAggregation that handles terms
+// it works very similar to a terms aggregation with slightly different syntax
+//
+// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html#_terms
+// for details.
+type CompositeAggregationTermsValuesSource struct {
+ name string
+ field string
+ script *Script
+ valueType string
+ missing interface{}
+ order string
+}
+
+// NewCompositeAggregationTermsValuesSource creates and initializes
+// a new CompositeAggregationTermsValuesSource.
+func NewCompositeAggregationTermsValuesSource(name string) *CompositeAggregationTermsValuesSource {
+ return &CompositeAggregationTermsValuesSource{
+ name: name,
+ }
+}
+
+// Field to use for this source.
+func (a *CompositeAggregationTermsValuesSource) Field(field string) *CompositeAggregationTermsValuesSource {
+ a.field = field
+ return a
+}
+
+// Script to use for this source.
+func (a *CompositeAggregationTermsValuesSource) Script(script *Script) *CompositeAggregationTermsValuesSource {
+ a.script = script
+ return a
+}
+
+// ValueType specifies the type of values produced by this source,
+// e.g. "string" or "date".
+func (a *CompositeAggregationTermsValuesSource) ValueType(valueType string) *CompositeAggregationTermsValuesSource {
+ a.valueType = valueType
+ return a
+}
+
+// Order specifies the order in the values produced by this source.
+// It can be either "asc" or "desc".
+func (a *CompositeAggregationTermsValuesSource) Order(order string) *CompositeAggregationTermsValuesSource {
+ a.order = order
+ return a
+}
+
+// Asc ensures the order of the values produced is ascending.
+func (a *CompositeAggregationTermsValuesSource) Asc() *CompositeAggregationTermsValuesSource {
+ a.order = "asc"
+ return a
+}
+
+// Desc ensures the order of the values produced is descending.
+func (a *CompositeAggregationTermsValuesSource) Desc() *CompositeAggregationTermsValuesSource {
+ a.order = "desc"
+ return a
+}
+
+// Missing specifies the value to use when the source finds a missing
+// value in a document.
+func (a *CompositeAggregationTermsValuesSource) Missing(missing interface{}) *CompositeAggregationTermsValuesSource {
+ a.missing = missing
+ return a
+}
+
+// Source returns the serializable JSON for this values source.
+func (a *CompositeAggregationTermsValuesSource) Source() (interface{}, error) {
+ source := make(map[string]interface{})
+ name := make(map[string]interface{})
+ source[a.name] = name
+ values := make(map[string]interface{})
+ name["terms"] = values
+
+ // field
+ if a.field != "" {
+ values["field"] = a.field
+ }
+
+ // script
+ if a.script != nil {
+ src, err := a.script.Source()
+ if err != nil {
+ return nil, err
+ }
+ values["script"] = src
+ }
+
+ // missing
+ if a.missing != nil {
+ values["missing"] = a.missing
+ }
+
+ // value_type
+ if a.valueType != "" {
+ values["value_type"] = a.valueType
+ }
+
+ // order
+ if a.order != "" {
+ values["order"] = a.order
+ }
+
+ return source, nil
+
+}
+
+// -- CompositeAggregationHistogramValuesSource --
+
+// CompositeAggregationHistogramValuesSource is a source for the CompositeAggregation that handles histograms
+// it works very similar to a terms histogram with slightly different syntax
+//
+// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html#_histogram
+// for details.
+type CompositeAggregationHistogramValuesSource struct {
+ name string
+ field string
+ script *Script
+ valueType string
+ missing interface{}
+ order string
+ interval float64
+}
+
+// NewCompositeAggregationHistogramValuesSource creates and initializes
+// a new CompositeAggregationHistogramValuesSource.
+func NewCompositeAggregationHistogramValuesSource(name string, interval float64) *CompositeAggregationHistogramValuesSource {
+ return &CompositeAggregationHistogramValuesSource{
+ name: name,
+ interval: interval,
+ }
+}
+
+// Field to use for this source.
+func (a *CompositeAggregationHistogramValuesSource) Field(field string) *CompositeAggregationHistogramValuesSource {
+ a.field = field
+ return a
+}
+
+// Script to use for this source.
+func (a *CompositeAggregationHistogramValuesSource) Script(script *Script) *CompositeAggregationHistogramValuesSource {
+ a.script = script
+ return a
+}
+
+// ValueType specifies the type of values produced by this source,
+// e.g. "string" or "date".
+func (a *CompositeAggregationHistogramValuesSource) ValueType(valueType string) *CompositeAggregationHistogramValuesSource {
+ a.valueType = valueType
+ return a
+}
+
+// Missing specifies the value to use when the source finds a missing
+// value in a document.
+func (a *CompositeAggregationHistogramValuesSource) Missing(missing interface{}) *CompositeAggregationHistogramValuesSource {
+ a.missing = missing
+ return a
+}
+
+// Order specifies the order in the values produced by this source.
+// It can be either "asc" or "desc".
+func (a *CompositeAggregationHistogramValuesSource) Order(order string) *CompositeAggregationHistogramValuesSource {
+ a.order = order
+ return a
+}
+
+// Asc ensures the order of the values produced is ascending.
+func (a *CompositeAggregationHistogramValuesSource) Asc() *CompositeAggregationHistogramValuesSource {
+ a.order = "asc"
+ return a
+}
+
+// Desc ensures the order of the values produced is descending.
+func (a *CompositeAggregationHistogramValuesSource) Desc() *CompositeAggregationHistogramValuesSource {
+ a.order = "desc"
+ return a
+}
+
+// Interval specifies the interval to use.
+func (a *CompositeAggregationHistogramValuesSource) Interval(interval float64) *CompositeAggregationHistogramValuesSource {
+ a.interval = interval
+ return a
+}
+
+// Source returns the serializable JSON for this values source.
+func (a *CompositeAggregationHistogramValuesSource) Source() (interface{}, error) {
+ source := make(map[string]interface{})
+ name := make(map[string]interface{})
+ source[a.name] = name
+ values := make(map[string]interface{})
+ name["histogram"] = values
+
+ // field
+ if a.field != "" {
+ values["field"] = a.field
+ }
+
+ // script
+ if a.script != nil {
+ src, err := a.script.Source()
+ if err != nil {
+ return nil, err
+ }
+ values["script"] = src
+ }
+
+ // missing
+ if a.missing != nil {
+ values["missing"] = a.missing
+ }
+
+ // value_type
+ if a.valueType != "" {
+ values["value_type"] = a.valueType
+ }
+
+ // order
+ if a.order != "" {
+ values["order"] = a.order
+ }
+
+ // Histogram-related properties
+ values["interval"] = a.interval
+
+ return source, nil
+
+}
+
+// -- CompositeAggregationDateHistogramValuesSource --
+
+// CompositeAggregationDateHistogramValuesSource is a source for the CompositeAggregation that handles date histograms
+// it works very similar to a date histogram aggregation with slightly different syntax
+//
+// See https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-aggregations-bucket-composite-aggregation.html#_date_histogram
+// for details.
+type CompositeAggregationDateHistogramValuesSource struct {
+ name string
+ field string
+ script *Script
+ valueType string
+ missing interface{}
+ order string
+ interval interface{}
+ timeZone string
+}
+
+// NewCompositeAggregationDateHistogramValuesSource creates and initializes
+// a new CompositeAggregationDateHistogramValuesSource.
+func NewCompositeAggregationDateHistogramValuesSource(name string, interval interface{}) *CompositeAggregationDateHistogramValuesSource {
+ return &CompositeAggregationDateHistogramValuesSource{
+ name: name,
+ interval: interval,
+ }
+}
+
+// Field to use for this source.
+func (a *CompositeAggregationDateHistogramValuesSource) Field(field string) *CompositeAggregationDateHistogramValuesSource {
+ a.field = field
+ return a
+}
+
+// Script to use for this source.
+func (a *CompositeAggregationDateHistogramValuesSource) Script(script *Script) *CompositeAggregationDateHistogramValuesSource {
+ a.script = script
+ return a
+}
+
+// ValueType specifies the type of values produced by this source,
+// e.g. "string" or "date".
+func (a *CompositeAggregationDateHistogramValuesSource) ValueType(valueType string) *CompositeAggregationDateHistogramValuesSource {
+ a.valueType = valueType
+ return a
+}
+
+// Missing specifies the value to use when the source finds a missing
+// value in a document.
+func (a *CompositeAggregationDateHistogramValuesSource) Missing(missing interface{}) *CompositeAggregationDateHistogramValuesSource {
+ a.missing = missing
+ return a
+}
+
+// Order specifies the order in the values produced by this source.
+// It can be either "asc" or "desc".
+func (a *CompositeAggregationDateHistogramValuesSource) Order(order string) *CompositeAggregationDateHistogramValuesSource {
+ a.order = order
+ return a
+}
+
+// Asc ensures the order of the values produced is ascending.
+func (a *CompositeAggregationDateHistogramValuesSource) Asc() *CompositeAggregationDateHistogramValuesSource {
+ a.order = "asc"
+ return a
+}
+
+// Desc ensures the order of the values produced is descending.
+func (a *CompositeAggregationDateHistogramValuesSource) Desc() *CompositeAggregationDateHistogramValuesSource {
+ a.order = "desc"
+ return a
+}
+
+// Interval to use for the date histogram, e.g. "1d" or a numeric value like "60".
+func (a *CompositeAggregationDateHistogramValuesSource) Interval(interval interface{}) *CompositeAggregationDateHistogramValuesSource {
+ a.interval = interval
+ return a
+}
+
+// TimeZone to use for the dates.
+func (a *CompositeAggregationDateHistogramValuesSource) TimeZone(timeZone string) *CompositeAggregationDateHistogramValuesSource {
+ a.timeZone = timeZone
+ return a
+}
+
+// Source returns the serializable JSON for this values source.
+func (a *CompositeAggregationDateHistogramValuesSource) Source() (interface{}, error) {
+ source := make(map[string]interface{})
+ name := make(map[string]interface{})
+ source[a.name] = name
+ values := make(map[string]interface{})
+ name["date_histogram"] = values
+
+ // field
+ if a.field != "" {
+ values["field"] = a.field
+ }
+
+ // script
+ if a.script != nil {
+ src, err := a.script.Source()
+ if err != nil {
+ return nil, err
+ }
+ values["script"] = src
+ }
+
+ // missing
+ if a.missing != nil {
+ values["missing"] = a.missing
+ }
+
+ // value_type
+ if a.valueType != "" {
+ values["value_type"] = a.valueType
+ }
+
+ // order
+ if a.order != "" {
+ values["order"] = a.order
+ }
+
+ // DateHistogram-related properties
+ values["interval"] = a.interval
+
+ // timeZone
+ if a.timeZone != "" {
+ values["time_zone"] = a.timeZone
+ }
+
+ return source, nil
+}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite_test.go
new file mode 100644
index 000000000..91d84dbdb
--- /dev/null
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_composite_test.go
@@ -0,0 +1,92 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See http://olivere.mit-license.org/license.txt for details.
+
+package elastic
+
+import (
+ "encoding/json"
+ "testing"
+)
+
+func TestCompositeAggregation(t *testing.T) {
+ agg := NewCompositeAggregation().
+ Sources(
+ NewCompositeAggregationTermsValuesSource("my_terms").Field("a_term").Missing("N/A").Order("asc"),
+ NewCompositeAggregationHistogramValuesSource("my_histogram", 5).Field("price").Asc(),
+ NewCompositeAggregationDateHistogramValuesSource("my_date_histogram", "1d").Field("purchase_date").Desc(),
+ ).
+ Size(10).
+ AggregateAfter(map[string]interface{}{
+ "my_terms": "1",
+ "my_histogram": 2,
+ "my_date_histogram": "3",
+ })
+ src, err := agg.Source()
+ if err != nil {
+ t.Fatal(err)
+ }
+ data, err := json.Marshal(src)
+ if err != nil {
+ t.Fatalf("marshaling to JSON failed: %v", err)
+ }
+ got := string(data)
+ expected := `{"composite":{"after":{"my_date_histogram":"3","my_histogram":2,"my_terms":"1"},"size":10,"sources":[{"my_terms":{"terms":{"field":"a_term","missing":"N/A","order":"asc"}}},{"my_histogram":{"histogram":{"field":"price","interval":5,"order":"asc"}}},{"my_date_histogram":{"date_histogram":{"field":"purchase_date","interval":"1d","order":"desc"}}}]}}`
+ if got != expected {
+ t.Errorf("expected\n%s\n,got:\n%s", expected, got)
+ }
+}
+
+func TestCompositeAggregationTermsValuesSource(t *testing.T) {
+ in := NewCompositeAggregationTermsValuesSource("products").
+ Script(NewScript("doc['product'].value").Lang("painless"))
+ src, err := in.Source()
+ if err != nil {
+ t.Fatal(err)
+ }
+ data, err := json.Marshal(src)
+ if err != nil {
+ t.Fatalf("marshaling to JSON failed: %v", err)
+ }
+ got := string(data)
+ expected := `{"products":{"terms":{"script":{"lang":"painless","source":"doc['product'].value"}}}}`
+ if got != expected {
+ t.Errorf("expected\n%s\n,got:\n%s", expected, got)
+ }
+}
+
+func TestCompositeAggregationHistogramValuesSource(t *testing.T) {
+ in := NewCompositeAggregationHistogramValuesSource("histo", 5).
+ Field("price")
+ src, err := in.Source()
+ if err != nil {
+ t.Fatal(err)
+ }
+ data, err := json.Marshal(src)
+ if err != nil {
+ t.Fatalf("marshaling to JSON failed: %v", err)
+ }
+ got := string(data)
+ expected := `{"histo":{"histogram":{"field":"price","interval":5}}}`
+ if got != expected {
+ t.Errorf("expected\n%s\n,got:\n%s", expected, got)
+ }
+}
+
+func TestCompositeAggregationDateHistogramValuesSource(t *testing.T) {
+ in := NewCompositeAggregationDateHistogramValuesSource("date", "1d").
+ Field("timestamp")
+ src, err := in.Source()
+ if err != nil {
+ t.Fatal(err)
+ }
+ data, err := json.Marshal(src)
+ if err != nil {
+ t.Fatalf("marshaling to JSON failed: %v", err)
+ }
+ got := string(data)
+ expected := `{"date":{"date_histogram":{"field":"timestamp","interval":"1d"}}}`
+ if got != expected {
+ t.Errorf("expected\n%s\n,got:\n%s", expected, got)
+ }
+}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go
index 5407dadb8..714fd3e11 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range.go
@@ -23,6 +23,7 @@ type DateRangeAggregation struct {
meta map[string]interface{}
keyed *bool
unmapped *bool
+ timeZone string
format string
entries []DateRangeAggregationEntry
}
@@ -71,6 +72,11 @@ func (a *DateRangeAggregation) Unmapped(unmapped bool) *DateRangeAggregation {
return a
}
+func (a *DateRangeAggregation) TimeZone(timeZone string) *DateRangeAggregation {
+ a.timeZone = timeZone
+ return a
+}
+
func (a *DateRangeAggregation) Format(format string) *DateRangeAggregation {
a.format = format
return a
@@ -178,6 +184,9 @@ func (a *DateRangeAggregation) Source() (interface{}, error) {
if a.unmapped != nil {
opts["unmapped"] = *a.unmapped
}
+ if a.timeZone != "" {
+ opts["time_zone"] = a.timeZone
+ }
if a.format != "" {
opts["format"] = a.format
}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go
index d1c909f3e..89ed495f3 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_bucket_date_range_test.go
@@ -10,7 +10,7 @@ import (
)
func TestDateRangeAggregation(t *testing.T) {
- agg := NewDateRangeAggregation().Field("created_at")
+ agg := NewDateRangeAggregation().Field("created_at").TimeZone("UTC")
agg = agg.AddRange(nil, "2012-12-31")
agg = agg.AddRange("2013-01-01", "2013-12-31")
agg = agg.AddRange("2014-01-01", nil)
@@ -23,7 +23,7 @@ func TestDateRangeAggregation(t *testing.T) {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
- expected := `{"date_range":{"field":"created_at","ranges":[{"to":"2012-12-31"},{"from":"2013-01-01","to":"2013-12-31"},{"from":"2014-01-01"}]}}`
+ expected := `{"date_range":{"field":"created_at","ranges":[{"to":"2012-12-31"},{"from":"2013-01-01","to":"2013-12-31"},{"from":"2014-01-01"}],"time_zone":"UTC"}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go
index 9d6fa8d27..f1b6347b3 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_aggs_test.go
@@ -13,13 +13,15 @@ import (
)
func TestAggs(t *testing.T) {
- // client := setupTestClientAndCreateIndex(t, SetTraceLog(log.New(os.Stdout, "", log.LstdFlags)))
+ //client := setupTestClientAndCreateIndex(t, SetTraceLog(log.New(os.Stdout, "", log.LstdFlags)))
client := setupTestClientAndCreateIndex(t)
- esversion, err := client.ElasticsearchVersion(DefaultURL)
- if err != nil {
- t.Fatal(err)
- }
+ /*
+ esversion, err := client.ElasticsearchVersion(DefaultURL)
+ if err != nil {
+ t.Fatal(err)
+ }
+ */
tweet1 := tweet{
User: "olivere",
@@ -48,7 +50,7 @@ func TestAggs(t *testing.T) {
}
// Add all documents
- _, err = client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
+ _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
if err != nil {
t.Fatal(err)
}
@@ -102,6 +104,11 @@ func TestAggs(t *testing.T) {
topTagsAgg := NewTermsAggregation().Field("tags").Size(3).SubAggregation("top_tag_hits", topTagsHitsAgg)
geoBoundsAgg := NewGeoBoundsAggregation().Field("location")
geoHashAgg := NewGeoHashGridAggregation().Field("location").Precision(5)
+ composite := NewCompositeAggregation().Sources(
+ NewCompositeAggregationTermsValuesSource("composite_users").Field("user"),
+ NewCompositeAggregationHistogramValuesSource("composite_retweets", 1).Field("retweets"),
+ NewCompositeAggregationDateHistogramValuesSource("composite_created", "1m").Field("created"),
+ )
// Run query
builder := client.Search().Index(testIndexName).Query(all).Pretty(true)
@@ -109,9 +116,7 @@ func TestAggs(t *testing.T) {
builder = builder.Aggregation("users", usersAgg)
builder = builder.Aggregation("retweets", retweetsAgg)
builder = builder.Aggregation("avgRetweets", avgRetweetsAgg)
- if esversion >= "2.0" {
- builder = builder.Aggregation("avgRetweetsWithMeta", avgRetweetsWithMetaAgg)
- }
+ builder = builder.Aggregation("avgRetweetsWithMeta", avgRetweetsWithMetaAgg)
builder = builder.Aggregation("minRetweets", minRetweetsAgg)
builder = builder.Aggregation("maxRetweets", maxRetweetsAgg)
builder = builder.Aggregation("sumRetweets", sumRetweetsAgg)
@@ -134,44 +139,41 @@ func TestAggs(t *testing.T) {
builder = builder.Aggregation("top-tags", topTagsAgg)
builder = builder.Aggregation("viewport", geoBoundsAgg)
builder = builder.Aggregation("geohashed", geoHashAgg)
- if esversion >= "1.4" {
- // Unnamed filters
- countByUserAgg := NewFiltersAggregation().
- Filters(NewTermQuery("user", "olivere"), NewTermQuery("user", "sandrae"))
- builder = builder.Aggregation("countByUser", countByUserAgg)
- // Named filters
- countByUserAgg2 := NewFiltersAggregation().
- FilterWithName("olivere", NewTermQuery("user", "olivere")).
- FilterWithName("sandrae", NewTermQuery("user", "sandrae"))
- builder = builder.Aggregation("countByUser2", countByUserAgg2)
- }
- if esversion >= "2.0" {
- // AvgBucket
- dateHisto := NewDateHistogramAggregation().Field("created").Interval("year")
- dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
- builder = builder.Aggregation("avgBucketDateHisto", dateHisto)
- builder = builder.Aggregation("avgSumOfRetweets", NewAvgBucketAggregation().BucketsPath("avgBucketDateHisto>sumOfRetweets"))
- // MinBucket
- dateHisto = NewDateHistogramAggregation().Field("created").Interval("year")
- dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
- builder = builder.Aggregation("minBucketDateHisto", dateHisto)
- builder = builder.Aggregation("minBucketSumOfRetweets", NewMinBucketAggregation().BucketsPath("minBucketDateHisto>sumOfRetweets"))
- // MaxBucket
- dateHisto = NewDateHistogramAggregation().Field("created").Interval("year")
- dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
- builder = builder.Aggregation("maxBucketDateHisto", dateHisto)
- builder = builder.Aggregation("maxBucketSumOfRetweets", NewMaxBucketAggregation().BucketsPath("maxBucketDateHisto>sumOfRetweets"))
- // SumBucket
- dateHisto = NewDateHistogramAggregation().Field("created").Interval("year")
- dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
- builder = builder.Aggregation("sumBucketDateHisto", dateHisto)
- builder = builder.Aggregation("sumBucketSumOfRetweets", NewSumBucketAggregation().BucketsPath("sumBucketDateHisto>sumOfRetweets"))
- // MovAvg
- dateHisto = NewDateHistogramAggregation().Field("created").Interval("year")
- dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
- dateHisto = dateHisto.SubAggregation("movingAvg", NewMovAvgAggregation().BucketsPath("sumOfRetweets"))
- builder = builder.Aggregation("movingAvgDateHisto", dateHisto)
- }
+ // Unnamed filters
+ countByUserAgg := NewFiltersAggregation().
+ Filters(NewTermQuery("user", "olivere"), NewTermQuery("user", "sandrae"))
+ builder = builder.Aggregation("countByUser", countByUserAgg)
+ // Named filters
+ countByUserAgg2 := NewFiltersAggregation().
+ FilterWithName("olivere", NewTermQuery("user", "olivere")).
+ FilterWithName("sandrae", NewTermQuery("user", "sandrae"))
+ builder = builder.Aggregation("countByUser2", countByUserAgg2)
+ // AvgBucket
+ dateHisto := NewDateHistogramAggregation().Field("created").Interval("year")
+ dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
+ builder = builder.Aggregation("avgBucketDateHisto", dateHisto)
+ builder = builder.Aggregation("avgSumOfRetweets", NewAvgBucketAggregation().BucketsPath("avgBucketDateHisto>sumOfRetweets"))
+ // MinBucket
+ dateHisto = NewDateHistogramAggregation().Field("created").Interval("year")
+ dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
+ builder = builder.Aggregation("minBucketDateHisto", dateHisto)
+ builder = builder.Aggregation("minBucketSumOfRetweets", NewMinBucketAggregation().BucketsPath("minBucketDateHisto>sumOfRetweets"))
+ // MaxBucket
+ dateHisto = NewDateHistogramAggregation().Field("created").Interval("year")
+ dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
+ builder = builder.Aggregation("maxBucketDateHisto", dateHisto)
+ builder = builder.Aggregation("maxBucketSumOfRetweets", NewMaxBucketAggregation().BucketsPath("maxBucketDateHisto>sumOfRetweets"))
+ // SumBucket
+ dateHisto = NewDateHistogramAggregation().Field("created").Interval("year")
+ dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
+ builder = builder.Aggregation("sumBucketDateHisto", dateHisto)
+ builder = builder.Aggregation("sumBucketSumOfRetweets", NewSumBucketAggregation().BucketsPath("sumBucketDateHisto>sumOfRetweets"))
+ // MovAvg
+ dateHisto = NewDateHistogramAggregation().Field("created").Interval("year")
+ dateHisto = dateHisto.SubAggregation("sumOfRetweets", NewSumAggregation().Field("retweets"))
+ dateHisto = dateHisto.SubAggregation("movingAvg", NewMovAvgAggregation().BucketsPath("sumOfRetweets"))
+ builder = builder.Aggregation("movingAvgDateHisto", dateHisto)
+ builder = builder.Aggregation("composite", composite)
searchResult, err := builder.Do(context.TODO())
if err != nil {
t.Fatal(err)
@@ -308,26 +310,24 @@ func TestAggs(t *testing.T) {
}
// avgRetweetsWithMeta
- if esversion >= "2.0" {
- avgMetaAggRes, found := agg.Avg("avgRetweetsWithMeta")
- if !found {
- t.Errorf("expected %v; got: %v", true, found)
- }
- if avgMetaAggRes == nil {
- t.Fatalf("expected != nil; got: nil")
- }
- if avgMetaAggRes.Meta == nil {
- t.Fatalf("expected != nil; got: %v", avgMetaAggRes.Meta)
- }
- metaDataValue, found := avgMetaAggRes.Meta["meta"]
- if !found {
- t.Fatalf("expected to return meta data key %q; got: %v", "meta", found)
- }
- if flag, ok := metaDataValue.(bool); !ok {
- t.Fatalf("expected to return meta data key type %T; got: %T", true, metaDataValue)
- } else if flag != true {
- t.Fatalf("expected to return meta data key value %v; got: %v", true, flag)
- }
+ avgMetaAggRes, found := agg.Avg("avgRetweetsWithMeta")
+ if !found {
+ t.Errorf("expected %v; got: %v", true, found)
+ }
+ if avgMetaAggRes == nil {
+ t.Fatalf("expected != nil; got: nil")
+ }
+ if avgMetaAggRes.Meta == nil {
+ t.Fatalf("expected != nil; got: %v", avgMetaAggRes.Meta)
+ }
+ metaDataValue, found := avgMetaAggRes.Meta["meta"]
+ if !found {
+ t.Fatalf("expected to return meta data key %q; got: %v", "meta", found)
+ }
+ if flag, ok := metaDataValue.(bool); !ok {
+ t.Fatalf("expected to return meta data key type %T; got: %T", true, metaDataValue)
+ } else if flag != true {
+ t.Fatalf("expected to return meta data key value %v; got: %v", true, flag)
}
// minRetweets
@@ -817,13 +817,11 @@ func TestAggs(t *testing.T) {
if topTags == nil {
t.Fatalf("expected != nil; got: nil")
}
- if esversion >= "1.4.0" {
- if topTags.DocCountErrorUpperBound != 0 {
- t.Errorf("expected %v; got: %v", 0, topTags.DocCountErrorUpperBound)
- }
- if topTags.SumOfOtherDocCount != 1 {
- t.Errorf("expected %v; got: %v", 1, topTags.SumOfOtherDocCount)
- }
+ if topTags.DocCountErrorUpperBound != 0 {
+ t.Errorf("expected %v; got: %v", 0, topTags.DocCountErrorUpperBound)
+ }
+ if topTags.SumOfOtherDocCount != 1 {
+ t.Errorf("expected %v; got: %v", 1, topTags.SumOfOtherDocCount)
}
if len(topTags.Buckets) != 3 {
t.Fatalf("expected %d; got: %d", 3, len(topTags.Buckets))
@@ -924,62 +922,71 @@ func TestAggs(t *testing.T) {
t.Fatalf("expected != nil; got: nil")
}
- if esversion >= "1.4" {
- // Filters agg "countByUser" (unnamed)
- countByUserAggRes, found := agg.Filters("countByUser")
- if !found {
- t.Errorf("expected %v; got: %v", true, found)
- }
- if countByUserAggRes == nil {
- t.Fatalf("expected != nil; got: nil")
- }
- if len(countByUserAggRes.Buckets) != 2 {
- t.Fatalf("expected %d; got: %d", 2, len(countByUserAggRes.Buckets))
- }
- if len(countByUserAggRes.NamedBuckets) != 0 {
- t.Fatalf("expected %d; got: %d", 0, len(countByUserAggRes.NamedBuckets))
- }
- if countByUserAggRes.Buckets[0].DocCount != 2 {
- t.Errorf("expected %d; got: %d", 2, countByUserAggRes.Buckets[0].DocCount)
- }
- if countByUserAggRes.Buckets[1].DocCount != 1 {
- t.Errorf("expected %d; got: %d", 1, countByUserAggRes.Buckets[1].DocCount)
- }
+ // Filters agg "countByUser" (unnamed)
+ countByUserAggRes, found := agg.Filters("countByUser")
+ if !found {
+ t.Errorf("expected %v; got: %v", true, found)
+ }
+ if countByUserAggRes == nil {
+ t.Fatalf("expected != nil; got: nil")
+ }
+ if len(countByUserAggRes.Buckets) != 2 {
+ t.Fatalf("expected %d; got: %d", 2, len(countByUserAggRes.Buckets))
+ }
+ if len(countByUserAggRes.NamedBuckets) != 0 {
+ t.Fatalf("expected %d; got: %d", 0, len(countByUserAggRes.NamedBuckets))
+ }
+ if countByUserAggRes.Buckets[0].DocCount != 2 {
+ t.Errorf("expected %d; got: %d", 2, countByUserAggRes.Buckets[0].DocCount)
+ }
+ if countByUserAggRes.Buckets[1].DocCount != 1 {
+ t.Errorf("expected %d; got: %d", 1, countByUserAggRes.Buckets[1].DocCount)
+ }
- // Filters agg "countByUser2" (named)
- countByUser2AggRes, found := agg.Filters("countByUser2")
- if !found {
- t.Errorf("expected %v; got: %v", true, found)
- }
- if countByUser2AggRes == nil {
- t.Fatalf("expected != nil; got: nil")
- }
- if len(countByUser2AggRes.Buckets) != 0 {
- t.Fatalf("expected %d; got: %d", 0, len(countByUser2AggRes.Buckets))
- }
- if len(countByUser2AggRes.NamedBuckets) != 2 {
- t.Fatalf("expected %d; got: %d", 2, len(countByUser2AggRes.NamedBuckets))
- }
- b, found := countByUser2AggRes.NamedBuckets["olivere"]
- if !found {
- t.Fatalf("expected bucket %q; got: %v", "olivere", found)
- }
- if b == nil {
- t.Fatalf("expected bucket %q; got: %v", "olivere", b)
- }
- if b.DocCount != 2 {
- t.Errorf("expected %d; got: %d", 2, b.DocCount)
- }
- b, found = countByUser2AggRes.NamedBuckets["sandrae"]
- if !found {
- t.Fatalf("expected bucket %q; got: %v", "sandrae", found)
- }
- if b == nil {
- t.Fatalf("expected bucket %q; got: %v", "sandrae", b)
- }
- if b.DocCount != 1 {
- t.Errorf("expected %d; got: %d", 1, b.DocCount)
- }
+ // Filters agg "countByUser2" (named)
+ countByUser2AggRes, found := agg.Filters("countByUser2")
+ if !found {
+ t.Errorf("expected %v; got: %v", true, found)
+ }
+ if countByUser2AggRes == nil {
+ t.Fatalf("expected != nil; got: nil")
+ }
+ if len(countByUser2AggRes.Buckets) != 0 {
+ t.Fatalf("expected %d; got: %d", 0, len(countByUser2AggRes.Buckets))
+ }
+ if len(countByUser2AggRes.NamedBuckets) != 2 {
+ t.Fatalf("expected %d; got: %d", 2, len(countByUser2AggRes.NamedBuckets))
+ }
+ b, found := countByUser2AggRes.NamedBuckets["olivere"]
+ if !found {
+ t.Fatalf("expected bucket %q; got: %v", "olivere", found)
+ }
+ if b == nil {
+ t.Fatalf("expected bucket %q; got: %v", "olivere", b)
+ }
+ if b.DocCount != 2 {
+ t.Errorf("expected %d; got: %d", 2, b.DocCount)
+ }
+ b, found = countByUser2AggRes.NamedBuckets["sandrae"]
+ if !found {
+ t.Fatalf("expected bucket %q; got: %v", "sandrae", found)
+ }
+ if b == nil {
+ t.Fatalf("expected bucket %q; got: %v", "sandrae", b)
+ }
+ if b.DocCount != 1 {
+ t.Errorf("expected %d; got: %d", 1, b.DocCount)
+ }
+
+ compositeAggRes, found := agg.Composite("composite")
+ if !found {
+ t.Errorf("expected %v; got: %v", true, found)
+ }
+ if compositeAggRes == nil {
+ t.Fatalf("expected != nil; got: nil")
+ }
+ if want, have := 3, len(compositeAggRes.Buckets); want != have {
+ t.Fatalf("expected %d; got: %d", want, have)
}
}
@@ -3231,3 +3238,179 @@ func TestAggsPipelineSerialDiff(t *testing.T) {
t.Fatalf("expected aggregation value = %v; got: %v", float64(20), *agg.Value)
}
}
+
+func TestAggsComposite(t *testing.T) {
+ s := `{
+ "the_composite" : {
+ "buckets" : [
+ {
+ "key" : {
+ "composite_users" : "olivere",
+ "composite_retweets" : 0.0,
+ "composite_created" : 1349856720000
+ },
+ "doc_count" : 1
+ },
+ {
+ "key" : {
+ "composite_users" : "olivere",
+ "composite_retweets" : 108.0,
+ "composite_created" : 1355333880000
+ },
+ "doc_count" : 1
+ },
+ {
+ "key" : {
+ "composite_users" : "sandrae",
+ "composite_retweets" : 12.0,
+ "composite_created" : 1321009080000
+ },
+ "doc_count" : 1
+ }
+ ]
+ }
+ }`
+
+ aggs := new(Aggregations)
+ err := json.Unmarshal([]byte(s), &aggs)
+ if err != nil {
+ t.Fatalf("expected no error decoding; got: %v", err)
+ }
+
+ agg, found := aggs.Composite("the_composite")
+ if !found {
+ t.Fatalf("expected aggregation to be found; got: %v", found)
+ }
+ if agg == nil {
+ t.Fatalf("expected aggregation != nil; got: %v", agg)
+ }
+ if want, have := 3, len(agg.Buckets); want != have {
+ t.Fatalf("expected aggregation buckets length = %v; got: %v", want, have)
+ }
+
+ // 1st bucket
+ bucket := agg.Buckets[0]
+ if want, have := int64(1), bucket.DocCount; want != have {
+ t.Fatalf("expected aggregation bucket doc count = %v; got: %v", want, have)
+ }
+ if want, have := 3, len(bucket.Key); want != have {
+ t.Fatalf("expected aggregation bucket key length = %v; got: %v", want, have)
+ }
+ v, found := bucket.Key["composite_users"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_users")
+ }
+ s, ok := v.(string)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := "olivere", s; want != have {
+ t.Fatalf("expected to find bucket key value %q; got: %q", want, have)
+ }
+ v, found = bucket.Key["composite_retweets"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_retweets")
+ }
+ f, ok := v.(float64)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := 0.0, f; want != have {
+ t.Fatalf("expected to find bucket key value %v; got: %v", want, have)
+ }
+ v, found = bucket.Key["composite_created"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_created")
+ }
+ f, ok = v.(float64)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := 1349856720000.0, f; want != have {
+ t.Fatalf("expected to find bucket key value %v; got: %v", want, have)
+ }
+
+ // 2nd bucket
+ bucket = agg.Buckets[1]
+ if want, have := int64(1), bucket.DocCount; want != have {
+ t.Fatalf("expected aggregation bucket doc count = %v; got: %v", want, have)
+ }
+ if want, have := 3, len(bucket.Key); want != have {
+ t.Fatalf("expected aggregation bucket key length = %v; got: %v", want, have)
+ }
+ v, found = bucket.Key["composite_users"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_users")
+ }
+ s, ok = v.(string)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := "olivere", s; want != have {
+ t.Fatalf("expected to find bucket key value %q; got: %q", want, have)
+ }
+ v, found = bucket.Key["composite_retweets"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_retweets")
+ }
+ f, ok = v.(float64)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := 108.0, f; want != have {
+ t.Fatalf("expected to find bucket key value %v; got: %v", want, have)
+ }
+ v, found = bucket.Key["composite_created"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_created")
+ }
+ f, ok = v.(float64)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := 1355333880000.0, f; want != have {
+ t.Fatalf("expected to find bucket key value %v; got: %v", want, have)
+ }
+
+ // 3rd bucket
+ bucket = agg.Buckets[2]
+ if want, have := int64(1), bucket.DocCount; want != have {
+ t.Fatalf("expected aggregation bucket doc count = %v; got: %v", want, have)
+ }
+ if want, have := 3, len(bucket.Key); want != have {
+ t.Fatalf("expected aggregation bucket key length = %v; got: %v", want, have)
+ }
+ v, found = bucket.Key["composite_users"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_users")
+ }
+ s, ok = v.(string)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := "sandrae", s; want != have {
+ t.Fatalf("expected to find bucket key value %q; got: %q", want, have)
+ }
+ v, found = bucket.Key["composite_retweets"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_retweets")
+ }
+ f, ok = v.(float64)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := 12.0, f; want != have {
+ t.Fatalf("expected to find bucket key value %v; got: %v", want, have)
+ }
+ v, found = bucket.Key["composite_created"]
+ if !found {
+ t.Fatalf("expected to find bucket key %q", "composite_created")
+ }
+ f, ok = v.(float64)
+ if !ok {
+ t.Fatalf("expected to have bucket key of type string; got: %T", v)
+ }
+ if want, have := 1321009080000.0, f; want != have {
+ t.Fatalf("expected to find bucket key value %v; got: %v", want, have)
+ }
+}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set.go b/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set.go
new file mode 100644
index 000000000..be410a1a7
--- /dev/null
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set.go
@@ -0,0 +1,96 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See http://olivere.mit-license.org/license.txt for details.
+
+package elastic
+
+// TermsSetQuery returns any documents that match with at least
+// one or more of the provided terms. The terms are not analyzed
+// and thus must match exactly. The number of terms that must
+// match varies per document and is either controlled by a
+// minimum should match field or computed per document in a
+// minimum should match script.
+//
+// For more details, see
+// https://www.elastic.co/guide/en/elasticsearch/reference/6.1/query-dsl-terms-set-query.html
+type TermsSetQuery struct {
+ name string
+ values []interface{}
+ minimumShouldMatchField string
+ minimumShouldMatchScript *Script
+ queryName string
+ boost *float64
+}
+
+// NewTermsSetQuery creates and initializes a new TermsSetQuery.
+func NewTermsSetQuery(name string, values ...interface{}) *TermsSetQuery {
+ q := &TermsSetQuery{
+ name: name,
+ }
+ if len(values) > 0 {
+ q.values = append(q.values, values...)
+ }
+ return q
+}
+
+// MinimumShouldMatchField specifies the field to match.
+func (q *TermsSetQuery) MinimumShouldMatchField(minimumShouldMatchField string) *TermsSetQuery {
+ q.minimumShouldMatchField = minimumShouldMatchField
+ return q
+}
+
+// MinimumShouldMatchScript specifies the script to match.
+func (q *TermsSetQuery) MinimumShouldMatchScript(minimumShouldMatchScript *Script) *TermsSetQuery {
+ q.minimumShouldMatchScript = minimumShouldMatchScript
+ return q
+}
+
+// Boost sets the boost for this query.
+func (q *TermsSetQuery) Boost(boost float64) *TermsSetQuery {
+ q.boost = &boost
+ return q
+}
+
+// QueryName sets the query name for the filter that can be used
+// when searching for matched_filters per hit
+func (q *TermsSetQuery) QueryName(queryName string) *TermsSetQuery {
+ q.queryName = queryName
+ return q
+}
+
+// Source creates the query source for the term query.
+func (q *TermsSetQuery) Source() (interface{}, error) {
+ // {"terms_set":{"codes":{"terms":["abc","def"],"minimum_should_match_field":"required_matches"}}}
+ source := make(map[string]interface{})
+ inner := make(map[string]interface{})
+ params := make(map[string]interface{})
+ inner[q.name] = params
+ source["terms_set"] = inner
+
+ // terms
+ params["terms"] = q.values
+
+ // minimum_should_match_field
+ if match := q.minimumShouldMatchField; match != "" {
+ params["minimum_should_match_field"] = match
+ }
+
+ // minimum_should_match_script
+ if match := q.minimumShouldMatchScript; match != nil {
+ src, err := match.Source()
+ if err != nil {
+ return nil, err
+ }
+ params["minimum_should_match_script"] = src
+ }
+
+ // Common parameters for all queries
+ if q.boost != nil {
+ params["boost"] = *q.boost
+ }
+ if q.queryName != "" {
+ params["_name"] = q.queryName
+ }
+
+ return source, nil
+}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set_test.go
new file mode 100644
index 000000000..e13fbfb2f
--- /dev/null
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_queries_terms_set_test.go
@@ -0,0 +1,75 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See http://olivere.mit-license.org/license.txt for details.
+
+package elastic
+
+import (
+ "context"
+ "encoding/json"
+ "testing"
+)
+
+func TestTermsSetQueryWithField(t *testing.T) {
+ q := NewTermsSetQuery("codes", "abc", "def", "ghi").MinimumShouldMatchField("required_matches")
+ src, err := q.Source()
+ if err != nil {
+ t.Fatal(err)
+ }
+ data, err := json.Marshal(src)
+ if err != nil {
+ t.Fatalf("marshaling to JSON failed: %v", err)
+ }
+ got := string(data)
+ expected := `{"terms_set":{"codes":{"minimum_should_match_field":"required_matches","terms":["abc","def","ghi"]}}}`
+ if got != expected {
+ t.Errorf("expected\n%s\n,got:\n%s", expected, got)
+ }
+}
+
+func TestTermsSetQueryWithScript(t *testing.T) {
+ q := NewTermsSetQuery("codes", "abc", "def", "ghi").
+ MinimumShouldMatchScript(
+ NewScript(`Math.min(params.num_terms, doc['required_matches'].value)`),
+ )
+ src, err := q.Source()
+ if err != nil {
+ t.Fatal(err)
+ }
+ data, err := json.Marshal(src)
+ if err != nil {
+ t.Fatalf("marshaling to JSON failed: %v", err)
+ }
+ got := string(data)
+ expected := `{"terms_set":{"codes":{"minimum_should_match_script":{"source":"Math.min(params.num_terms, doc['required_matches'].value)"},"terms":["abc","def","ghi"]}}}`
+ if got != expected {
+ t.Errorf("expected\n%s\n,got:\n%s", expected, got)
+ }
+}
+
+func TestSearchTermsSetQuery(t *testing.T) {
+ //client := setupTestClientAndCreateIndexAndAddDocs(t, SetTraceLog(log.New(os.Stdout, "", log.LstdFlags)))
+ client := setupTestClientAndCreateIndexAndAddDocs(t)
+
+ // Match all should return all documents
+ searchResult, err := client.Search().
+ Index(testIndexName).
+ Query(
+ NewTermsSetQuery("user", "olivere", "sandrae").
+ MinimumShouldMatchField("retweets"),
+ ).
+ Pretty(true).
+ Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+ if searchResult.Hits == nil {
+ t.Errorf("expected SearchResult.Hits != nil; got nil")
+ }
+ if got, want := searchResult.Hits.TotalHits, int64(3); got != want {
+ t.Errorf("expected SearchResult.Hits.TotalHits = %d; got %d", want, got)
+ }
+ if got, want := len(searchResult.Hits.Hits), 3; got != want {
+ t.Errorf("expected len(SearchResult.Hits.Hits) = %d; got %d", want, got)
+ }
+}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_request.go b/vendor/gopkg.in/olivere/elastic.v5/search_request.go
index 6f40ff028..7ee4ce82c 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/search_request.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_request.go
@@ -4,7 +4,10 @@
package elastic
-import "strings"
+import (
+ "encoding/json"
+ "strings"
+)
// SearchRequest combines a search request and its
// query details (see SearchSource).
@@ -130,17 +133,7 @@ func (r *SearchRequest) SearchSource(searchSource *SearchSource) *SearchRequest
}
func (r *SearchRequest) Source(source interface{}) *SearchRequest {
- switch v := source.(type) {
- case *SearchSource:
- src, err := v.Source()
- if err != nil {
- // Do not do anything in case of an error
- return r
- }
- r.source = src
- default:
- r.source = source
- }
+ r.source = source
return r
}
@@ -200,6 +193,34 @@ func (r *SearchRequest) header() interface{} {
// Body is used e.g. by MultiSearch to get information about the search body
// of one SearchRequest.
// See https://www.elastic.co/guide/en/elasticsearch/reference/6.0/search-multi-search.html
-func (r *SearchRequest) Body() interface{} {
- return r.source
+func (r *SearchRequest) Body() (string, error) {
+ switch t := r.source.(type) {
+ default:
+ body, err := json.Marshal(r.source)
+ if err != nil {
+ return "", err
+ }
+ return string(body), nil
+ case *SearchSource:
+ src, err := t.Source()
+ if err != nil {
+ return "", err
+ }
+ body, err := json.Marshal(src)
+ if err != nil {
+ return "", err
+ }
+ return string(body), nil
+ case json.RawMessage:
+ return string(t), nil
+ case *json.RawMessage:
+ return string(*t), nil
+ case string:
+ return t, nil
+ case *string:
+ if t != nil {
+ return *t, nil
+ }
+ return "{}", nil
+ }
}
diff --git a/vendor/gopkg.in/olivere/elastic.v5/search_test.go b/vendor/gopkg.in/olivere/elastic.v5/search_test.go
index 097c26525..586089aaa 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/search_test.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/search_test.go
@@ -607,6 +607,61 @@ func TestSearchSource(t *testing.T) {
}
}
+func TestSearchSourceWithString(t *testing.T) {
+ client := setupTestClientAndCreateIndex(t)
+
+ tweet1 := tweet{
+ User: "olivere", Retweets: 108,
+ Message: "Welcome to Golang and Elasticsearch.",
+ Created: time.Date(2012, 12, 12, 17, 38, 34, 0, time.UTC),
+ }
+ tweet2 := tweet{
+ User: "olivere", Retweets: 0,
+ Message: "Another unrelated topic.",
+ Created: time.Date(2012, 10, 10, 8, 12, 03, 0, time.UTC),
+ }
+ tweet3 := tweet{
+ User: "sandrae", Retweets: 12,
+ Message: "Cycling is fun.",
+ Created: time.Date(2011, 11, 11, 10, 58, 12, 0, time.UTC),
+ }
+
+ // Add all documents
+ _, err := client.Index().Index(testIndexName).Type("doc").Id("1").BodyJson(&tweet1).Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = client.Index().Index(testIndexName).Type("doc").Id("2").BodyJson(&tweet2).Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = client.Index().Index(testIndexName).Type("doc").Id("3").BodyJson(&tweet3).Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = client.Flush().Index(testIndexName).Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ searchResult, err := client.Search().
+ Index(testIndexName).
+ Source(`{"query":{"match_all":{}}}`). // sets the JSON request
+ Do(context.TODO())
+ if err != nil {
+ t.Fatal(err)
+ }
+ if searchResult.Hits == nil {
+ t.Errorf("expected SearchResult.Hits != nil; got nil")
+ }
+ if searchResult.Hits.TotalHits != 3 {
+ t.Errorf("expected SearchResult.Hits.TotalHits = %d; got %d", 3, searchResult.Hits.TotalHits)
+ }
+}
+
func TestSearchRawString(t *testing.T) {
// client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0)))
client := setupTestClientAndCreateIndex(t)