summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/olivere/elastic.v5/cluster-test/cluster-test.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/cluster-test/cluster-test.go')
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/cluster-test/cluster-test.go361
1 files changed, 361 insertions, 0 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/cluster-test/cluster-test.go b/vendor/gopkg.in/olivere/elastic.v5/cluster-test/cluster-test.go
new file mode 100644
index 000000000..249b35c04
--- /dev/null
+++ b/vendor/gopkg.in/olivere/elastic.v5/cluster-test/cluster-test.go
@@ -0,0 +1,361 @@
+// Copyright 2012-present Oliver Eilhard. All rights reserved.
+// Use of this source code is governed by a MIT-license.
+// See http://olivere.mit-license.org/license.txt for details.
+
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "log"
+ "math/rand"
+ "os"
+ "runtime"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ elastic "gopkg.in/olivere/elastic.v5"
+)
+
+type Tweet struct {
+ User string `json:"user"`
+ Message string `json:"message"`
+ Retweets int `json:"retweets"`
+ Image string `json:"image,omitempty"`
+ Created time.Time `json:"created,omitempty"`
+ Tags []string `json:"tags,omitempty"`
+ Location string `json:"location,omitempty"`
+ Suggest *elastic.SuggestField `json:"suggest_field,omitempty"`
+}
+
+var (
+ nodes = flag.String("nodes", "", "comma-separated list of ES URLs (e.g. 'http://192.168.2.10:9200,http://192.168.2.11:9200')")
+ n = flag.Int("n", 5, "number of goroutines that run searches")
+ index = flag.String("index", "twitter", "name of ES index to use")
+ errorlogfile = flag.String("errorlog", "", "error log file")
+ infologfile = flag.String("infolog", "", "info log file")
+ tracelogfile = flag.String("tracelog", "", "trace log file")
+ retries = flag.Int("retries", 0, "number of retries")
+ sniff = flag.Bool("sniff", elastic.DefaultSnifferEnabled, "enable or disable sniffer")
+ sniffer = flag.Duration("sniffer", elastic.DefaultSnifferInterval, "sniffer interval")
+ healthcheck = flag.Bool("healthcheck", elastic.DefaultHealthcheckEnabled, "enable or disable healthchecks")
+ healthchecker = flag.Duration("healthchecker", elastic.DefaultHealthcheckInterval, "healthcheck interval")
+)
+
+func main() {
+ flag.Parse()
+
+ runtime.GOMAXPROCS(runtime.NumCPU())
+
+ if *nodes == "" {
+ log.Fatal("no nodes specified")
+ }
+ urls := strings.SplitN(*nodes, ",", -1)
+
+ testcase, err := NewTestCase(*index, urls)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ testcase.SetErrorLogFile(*errorlogfile)
+ testcase.SetInfoLogFile(*infologfile)
+ testcase.SetTraceLogFile(*tracelogfile)
+ testcase.SetMaxRetries(*retries)
+ testcase.SetHealthcheck(*healthcheck)
+ testcase.SetHealthcheckInterval(*healthchecker)
+ testcase.SetSniff(*sniff)
+ testcase.SetSnifferInterval(*sniffer)
+
+ if err := testcase.Run(*n); err != nil {
+ log.Fatal(err)
+ }
+
+ select {}
+}
+
+type RunInfo struct {
+ Success bool
+}
+
+type TestCase struct {
+ nodes []string
+ client *elastic.Client
+ runs int64
+ failures int64
+ runCh chan RunInfo
+ index string
+ errorlogfile string
+ infologfile string
+ tracelogfile string
+ maxRetries int
+ healthcheck bool
+ healthcheckInterval time.Duration
+ sniff bool
+ snifferInterval time.Duration
+}
+
+func NewTestCase(index string, nodes []string) (*TestCase, error) {
+ if index == "" {
+ return nil, errors.New("no index name specified")
+ }
+
+ return &TestCase{
+ index: index,
+ nodes: nodes,
+ runCh: make(chan RunInfo),
+ }, nil
+}
+
+func (t *TestCase) SetIndex(name string) {
+ t.index = name
+}
+
+func (t *TestCase) SetErrorLogFile(name string) {
+ t.errorlogfile = name
+}
+
+func (t *TestCase) SetInfoLogFile(name string) {
+ t.infologfile = name
+}
+
+func (t *TestCase) SetTraceLogFile(name string) {
+ t.tracelogfile = name
+}
+
+func (t *TestCase) SetMaxRetries(n int) {
+ t.maxRetries = n
+}
+
+func (t *TestCase) SetSniff(enabled bool) {
+ t.sniff = enabled
+}
+
+func (t *TestCase) SetSnifferInterval(d time.Duration) {
+ t.snifferInterval = d
+}
+
+func (t *TestCase) SetHealthcheck(enabled bool) {
+ t.healthcheck = enabled
+}
+
+func (t *TestCase) SetHealthcheckInterval(d time.Duration) {
+ t.healthcheckInterval = d
+}
+
+func (t *TestCase) Run(n int) error {
+ if err := t.setup(); err != nil {
+ return err
+ }
+
+ for i := 1; i < n; i++ {
+ go t.search()
+ }
+
+ go t.monitor()
+
+ return nil
+}
+
+func (t *TestCase) monitor() {
+ print := func() {
+ fmt.Printf("\033[32m%5d\033[0m; \033[31m%5d\033[0m: %s%s\r", t.runs, t.failures, t.client.String(), " ")
+ }
+
+ for {
+ select {
+ case run := <-t.runCh:
+ atomic.AddInt64(&t.runs, 1)
+ if !run.Success {
+ atomic.AddInt64(&t.failures, 1)
+ fmt.Println()
+ }
+ print()
+ case <-time.After(5 * time.Second):
+ // Print stats after some inactivity
+ print()
+ break
+ }
+ }
+}
+
+func (t *TestCase) setup() error {
+ var errorlogger *log.Logger
+ if t.errorlogfile != "" {
+ f, err := os.OpenFile(t.errorlogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
+ if err != nil {
+ return err
+ }
+ errorlogger = log.New(f, "", log.Ltime|log.Lmicroseconds|log.Lshortfile)
+ }
+
+ var infologger *log.Logger
+ if t.infologfile != "" {
+ f, err := os.OpenFile(t.infologfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
+ if err != nil {
+ return err
+ }
+ infologger = log.New(f, "", log.LstdFlags)
+ }
+
+ // Trace request and response details like this
+ var tracelogger *log.Logger
+ if t.tracelogfile != "" {
+ f, err := os.OpenFile(t.tracelogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
+ if err != nil {
+ return err
+ }
+ tracelogger = log.New(f, "", log.LstdFlags)
+ }
+
+ client, err := elastic.NewClient(
+ elastic.SetURL(t.nodes...),
+ elastic.SetErrorLog(errorlogger),
+ elastic.SetInfoLog(infologger),
+ elastic.SetTraceLog(tracelogger),
+ elastic.SetMaxRetries(t.maxRetries),
+ elastic.SetSniff(t.sniff),
+ elastic.SetSnifferInterval(t.snifferInterval),
+ elastic.SetHealthcheck(t.healthcheck),
+ elastic.SetHealthcheckInterval(t.healthcheckInterval))
+ if err != nil {
+ // Handle error
+ return err
+ }
+ t.client = client
+
+ ctx := context.Background()
+
+ // Use the IndexExists service to check if a specified index exists.
+ exists, err := t.client.IndexExists(t.index).Do(ctx)
+ if err != nil {
+ return err
+ }
+ if exists {
+ deleteIndex, err := t.client.DeleteIndex(t.index).Do(ctx)
+ if err != nil {
+ return err
+ }
+ if !deleteIndex.Acknowledged {
+ return errors.New("delete index not acknowledged")
+ }
+ }
+
+ // Create a new index.
+ createIndex, err := t.client.CreateIndex(t.index).Do(ctx)
+ if err != nil {
+ return err
+ }
+ if !createIndex.Acknowledged {
+ return errors.New("create index not acknowledged")
+ }
+
+ // Index a tweet (using JSON serialization)
+ tweet1 := Tweet{User: "olivere", Message: "Take Five", Retweets: 0}
+ _, err = t.client.Index().
+ Index(t.index).
+ Type("tweet").
+ Id("1").
+ BodyJson(tweet1).
+ Do(ctx)
+ if err != nil {
+ return err
+ }
+
+ // Index a second tweet (by string)
+ tweet2 := `{"user" : "olivere", "message" : "It's a Raggy Waltz"}`
+ _, err = t.client.Index().
+ Index(t.index).
+ Type("tweet").
+ Id("2").
+ BodyString(tweet2).
+ Do(ctx)
+ if err != nil {
+ return err
+ }
+
+ // Flush to make sure the documents got written.
+ _, err = t.client.Flush().Index(t.index).Do(ctx)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (t *TestCase) search() {
+ ctx := context.Background()
+
+ // Loop forever to check for connection issues
+ for {
+ // Get tweet with specified ID
+ get1, err := t.client.Get().
+ Index(t.index).
+ Type("tweet").
+ Id("1").
+ Do(ctx)
+ if err != nil {
+ //failf("Get failed: %v", err)
+ t.runCh <- RunInfo{Success: false}
+ continue
+ }
+ if !get1.Found {
+ //log.Printf("Document %s not found\n", "1")
+ //fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
+ t.runCh <- RunInfo{Success: false}
+ continue
+ }
+
+ // Search with a term query
+ searchResult, err := t.client.Search().
+ Index(t.index). // search in index t.index
+ Query(elastic.NewTermQuery("user", "olivere")). // specify the query
+ Sort("user", true). // sort by "user" field, ascending
+ From(0).Size(10). // take documents 0-9
+ Pretty(true). // pretty print request and response JSON
+ Do(ctx) // execute
+ if err != nil {
+ //failf("Search failed: %v\n", err)
+ t.runCh <- RunInfo{Success: false}
+ continue
+ }
+
+ // searchResult is of type SearchResult and returns hits, suggestions,
+ // and all kinds of other information from Elasticsearch.
+ //fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
+
+ // Number of hits
+ if searchResult.Hits.TotalHits > 0 {
+ //fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits)
+
+ // Iterate through results
+ for _, hit := range searchResult.Hits.Hits {
+ // hit.Index contains the name of the index
+
+ // Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}).
+ var tweet Tweet
+ err := json.Unmarshal(*hit.Source, &tweet)
+ if err != nil {
+ // Deserialization failed
+ //failf("Deserialize failed: %v\n", err)
+ t.runCh <- RunInfo{Success: false}
+ continue
+ }
+
+ // Work with tweet
+ //fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
+ }
+ } else {
+ // No hits
+ //fmt.Print("Found no tweets\n")
+ }
+
+ t.runCh <- RunInfo{Success: true}
+
+ // Sleep some time
+ time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
+ }
+}