summaryrefslogtreecommitdiffstats
path: root/vendor/gopkg.in/olivere/elastic.v5/reindex.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/reindex.go')
-rw-r--r--vendor/gopkg.in/olivere/elastic.v5/reindex.go125
1 files changed, 124 insertions, 1 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/reindex.go b/vendor/gopkg.in/olivere/elastic.v5/reindex.go
index 6ba638856..4650fb18b 100644
--- a/vendor/gopkg.in/olivere/elastic.v5/reindex.go
+++ b/vendor/gopkg.in/olivere/elastic.v5/reindex.go
@@ -280,6 +280,48 @@ func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, er
return ret, nil
}
+// DoAsync executes the reindexing operation asynchronously by starting a new task.
+// Callers need to use the Task Management API to watch the outcome of the reindexing
+// operation.
+func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error) {
+ // Check pre-conditions
+ if err := s.Validate(); err != nil {
+ return nil, err
+ }
+
+ // DoAsync only makes sense with WaitForCompletion set to true
+ if s.waitForCompletion != nil && *s.waitForCompletion {
+ return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true")
+ }
+ f := false
+ s.waitForCompletion = &f
+
+ // Get URL for request
+ path, params, err := s.buildURL()
+ if err != nil {
+ return nil, err
+ }
+
+ // Setup HTTP request body
+ body, err := s.getBody()
+ if err != nil {
+ return nil, err
+ }
+
+ // Get HTTP response
+ res, err := s.client.PerformRequest(ctx, "POST", path, params, body)
+ if err != nil {
+ return nil, err
+ }
+
+ // Return operation response
+ ret := new(StartTaskResult)
+ if err := s.client.decoder.Decode(res.Body, ret); err != nil {
+ return nil, err
+ }
+ return ret, nil
+}
+
// -- Source of Reindex --
// ReindexSource specifies the source of a Reindex process.
@@ -295,6 +337,7 @@ type ReindexSource struct {
sorts []SortInfo
sorters []Sorter
searchSource *SearchSource
+ remoteInfo *ReindexRemoteInfo
}
// NewReindexSource creates a new ReindexSource.
@@ -359,12 +402,18 @@ func (s *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource {
return s
}
-// SortBy adds a sort order.
+// SortBy adds a sort order.
func (s *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource {
s.sorters = append(s.sorters, sorter...)
return s
}
+// RemoteInfo sets up reindexing from a remote cluster.
+func (s *ReindexSource) RemoteInfo(ri *ReindexRemoteInfo) *ReindexSource {
+ s.remoteInfo = ri
+ return s
+}
+
// Source returns a serializable JSON request for the request.
func (r *ReindexSource) Source() (interface{}, error) {
source := make(map[string]interface{})
@@ -415,6 +464,14 @@ func (r *ReindexSource) Source() (interface{}, error) {
source["scroll"] = r.scroll
}
+ if r.remoteInfo != nil {
+ src, err := r.remoteInfo.Source()
+ if err != nil {
+ return nil, err
+ }
+ source["remote"] = src
+ }
+
if len(r.sorters) > 0 {
var sortarr []interface{}
for _, sorter := range r.sorters {
@@ -440,6 +497,72 @@ func (r *ReindexSource) Source() (interface{}, error) {
return source, nil
}
+// ReindexRemoteInfo contains information for reindexing from a remote cluster.
+type ReindexRemoteInfo struct {
+ host string
+ username string
+ password string
+ socketTimeout string // e.g. "1m" or "30s"
+ connectTimeout string // e.g. "1m" or "30s"
+}
+
+// NewReindexRemoteInfo creates a new ReindexRemoteInfo.
+func NewReindexRemoteInfo() *ReindexRemoteInfo {
+ return &ReindexRemoteInfo{}
+}
+
+// Host sets the host information of the remote cluster.
+// It must be of the form "http(s)://<hostname>:<port>"
+func (ri *ReindexRemoteInfo) Host(host string) *ReindexRemoteInfo {
+ ri.host = host
+ return ri
+}
+
+// Username sets the username to authenticate with the remote cluster.
+func (ri *ReindexRemoteInfo) Username(username string) *ReindexRemoteInfo {
+ ri.username = username
+ return ri
+}
+
+// Password sets the password to authenticate with the remote cluster.
+func (ri *ReindexRemoteInfo) Password(password string) *ReindexRemoteInfo {
+ ri.password = password
+ return ri
+}
+
+// SocketTimeout sets the socket timeout to connect with the remote cluster.
+// Use ES compatible values like e.g. "30s" or "1m".
+func (ri *ReindexRemoteInfo) SocketTimeout(timeout string) *ReindexRemoteInfo {
+ ri.socketTimeout = timeout
+ return ri
+}
+
+// ConnectTimeout sets the connection timeout to connect with the remote cluster.
+// Use ES compatible values like e.g. "30s" or "1m".
+func (ri *ReindexRemoteInfo) ConnectTimeout(timeout string) *ReindexRemoteInfo {
+ ri.connectTimeout = timeout
+ return ri
+}
+
+// Source returns the serializable JSON data for the request.
+func (ri *ReindexRemoteInfo) Source() (interface{}, error) {
+ res := make(map[string]interface{})
+ res["host"] = ri.host
+ if len(ri.username) > 0 {
+ res["username"] = ri.username
+ }
+ if len(ri.password) > 0 {
+ res["password"] = ri.password
+ }
+ if len(ri.socketTimeout) > 0 {
+ res["socket_timeout"] = ri.socketTimeout
+ }
+ if len(ri.connectTimeout) > 0 {
+ res["connect_timeout"] = ri.connectTimeout
+ }
+ return res, nil
+}
+
// -source Destination of Reindex --
// ReindexDestination is the destination of a Reindex API call.