diff options
Diffstat (limited to 'vendor/gopkg.in/olivere/elastic.v5/reindex.go')
-rw-r--r-- | vendor/gopkg.in/olivere/elastic.v5/reindex.go | 125 |
1 files changed, 124 insertions, 1 deletions
diff --git a/vendor/gopkg.in/olivere/elastic.v5/reindex.go b/vendor/gopkg.in/olivere/elastic.v5/reindex.go index 6ba638856..4650fb18b 100644 --- a/vendor/gopkg.in/olivere/elastic.v5/reindex.go +++ b/vendor/gopkg.in/olivere/elastic.v5/reindex.go @@ -280,6 +280,48 @@ func (s *ReindexService) Do(ctx context.Context) (*BulkIndexByScrollResponse, er return ret, nil } +// DoAsync executes the reindexing operation asynchronously by starting a new task. +// Callers need to use the Task Management API to watch the outcome of the reindexing +// operation. +func (s *ReindexService) DoAsync(ctx context.Context) (*StartTaskResult, error) { + // Check pre-conditions + if err := s.Validate(); err != nil { + return nil, err + } + + // DoAsync only makes sense with WaitForCompletion set to true + if s.waitForCompletion != nil && *s.waitForCompletion { + return nil, fmt.Errorf("cannot start a task with WaitForCompletion set to true") + } + f := false + s.waitForCompletion = &f + + // Get URL for request + path, params, err := s.buildURL() + if err != nil { + return nil, err + } + + // Setup HTTP request body + body, err := s.getBody() + if err != nil { + return nil, err + } + + // Get HTTP response + res, err := s.client.PerformRequest(ctx, "POST", path, params, body) + if err != nil { + return nil, err + } + + // Return operation response + ret := new(StartTaskResult) + if err := s.client.decoder.Decode(res.Body, ret); err != nil { + return nil, err + } + return ret, nil +} + // -- Source of Reindex -- // ReindexSource specifies the source of a Reindex process. @@ -295,6 +337,7 @@ type ReindexSource struct { sorts []SortInfo sorters []Sorter searchSource *SearchSource + remoteInfo *ReindexRemoteInfo } // NewReindexSource creates a new ReindexSource. @@ -359,12 +402,18 @@ func (s *ReindexSource) SortWithInfo(info SortInfo) *ReindexSource { return s } -// SortBy adds a sort order. +// SortBy adds a sort order. func (s *ReindexSource) SortBy(sorter ...Sorter) *ReindexSource { s.sorters = append(s.sorters, sorter...) return s } +// RemoteInfo sets up reindexing from a remote cluster. +func (s *ReindexSource) RemoteInfo(ri *ReindexRemoteInfo) *ReindexSource { + s.remoteInfo = ri + return s +} + // Source returns a serializable JSON request for the request. func (r *ReindexSource) Source() (interface{}, error) { source := make(map[string]interface{}) @@ -415,6 +464,14 @@ func (r *ReindexSource) Source() (interface{}, error) { source["scroll"] = r.scroll } + if r.remoteInfo != nil { + src, err := r.remoteInfo.Source() + if err != nil { + return nil, err + } + source["remote"] = src + } + if len(r.sorters) > 0 { var sortarr []interface{} for _, sorter := range r.sorters { @@ -440,6 +497,72 @@ func (r *ReindexSource) Source() (interface{}, error) { return source, nil } +// ReindexRemoteInfo contains information for reindexing from a remote cluster. +type ReindexRemoteInfo struct { + host string + username string + password string + socketTimeout string // e.g. "1m" or "30s" + connectTimeout string // e.g. "1m" or "30s" +} + +// NewReindexRemoteInfo creates a new ReindexRemoteInfo. +func NewReindexRemoteInfo() *ReindexRemoteInfo { + return &ReindexRemoteInfo{} +} + +// Host sets the host information of the remote cluster. +// It must be of the form "http(s)://<hostname>:<port>" +func (ri *ReindexRemoteInfo) Host(host string) *ReindexRemoteInfo { + ri.host = host + return ri +} + +// Username sets the username to authenticate with the remote cluster. +func (ri *ReindexRemoteInfo) Username(username string) *ReindexRemoteInfo { + ri.username = username + return ri +} + +// Password sets the password to authenticate with the remote cluster. +func (ri *ReindexRemoteInfo) Password(password string) *ReindexRemoteInfo { + ri.password = password + return ri +} + +// SocketTimeout sets the socket timeout to connect with the remote cluster. +// Use ES compatible values like e.g. "30s" or "1m". +func (ri *ReindexRemoteInfo) SocketTimeout(timeout string) *ReindexRemoteInfo { + ri.socketTimeout = timeout + return ri +} + +// ConnectTimeout sets the connection timeout to connect with the remote cluster. +// Use ES compatible values like e.g. "30s" or "1m". +func (ri *ReindexRemoteInfo) ConnectTimeout(timeout string) *ReindexRemoteInfo { + ri.connectTimeout = timeout + return ri +} + +// Source returns the serializable JSON data for the request. +func (ri *ReindexRemoteInfo) Source() (interface{}, error) { + res := make(map[string]interface{}) + res["host"] = ri.host + if len(ri.username) > 0 { + res["username"] = ri.username + } + if len(ri.password) > 0 { + res["password"] = ri.password + } + if len(ri.socketTimeout) > 0 { + res["socket_timeout"] = ri.socketTimeout + } + if len(ri.connectTimeout) > 0 { + res["connect_timeout"] = ri.connectTimeout + } + return res, nil +} + // -source Destination of Reindex -- // ReindexDestination is the destination of a Reindex API call. |