summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--app/import.go70
-rw-r--r--app/import_test.go6
-rw-r--r--cmd/platform/import.go8
3 files changed, 77 insertions, 7 deletions
diff --git a/app/import.go b/app/import.go
index f92c9b1cc..488cb22aa 100644
--- a/app/import.go
+++ b/app/import.go
@@ -11,6 +11,7 @@ import (
"net/http"
"regexp"
"strings"
+ "sync"
"unicode/utf8"
l4g "github.com/alecthomas/log4go"
@@ -95,15 +96,40 @@ type PostImportData struct {
CreateAt *int64 `json:"create_at"`
}
+type LineImportWorkerData struct {
+ LineImportData
+ LineNumber int
+}
+
+type LineImportWorkerError struct {
+ Error *model.AppError
+ LineNumber int
+}
+
//
// -- Bulk Import Functions --
// These functions import data directly into the database. Security and permission checks are bypassed but validity is
// still enforced.
//
-func BulkImport(fileReader io.Reader, dryRun bool) (*model.AppError, int) {
+func bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan LineImportWorkerData, errors chan<- LineImportWorkerError) {
+ for line := range lines {
+ if err := ImportLine(line.LineImportData, dryRun); err != nil {
+ errors <- LineImportWorkerError{err, line.LineNumber}
+ }
+ }
+ wg.Done()
+}
+
+func BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int) {
scanner := bufio.NewScanner(fileReader)
lineNumber := 0
+
+ errorsChan := make(chan LineImportWorkerError, (2*workers)+1) // size chosen to ensure it never gets filled up completely.
+ var wg sync.WaitGroup
+ var linesChan chan LineImportWorkerData
+ lastLineType := ""
+
for scanner.Scan() {
decoder := json.NewDecoder(strings.NewReader(scanner.Text()))
lineNumber++
@@ -121,12 +147,50 @@ func BulkImport(fileReader io.Reader, dryRun bool) (*model.AppError, int) {
if importDataFileVersion != 1 {
return model.NewAppError("BulkImport", "app.import.bulk_import.unsupported_version.error", nil, "", http.StatusBadRequest), lineNumber
}
- } else if err := ImportLine(line, dryRun); err != nil {
- return err, lineNumber
+ } else {
+ if line.Type != lastLineType {
+ if lastLineType != "" {
+ // Changing type. Clear out the worker queue before continuing.
+ close(linesChan)
+ wg.Wait()
+
+ // Check no errors occurred while waiting for the queue to empty.
+ if len(errorsChan) != 0 {
+ err := <-errorsChan
+ return err.Error, err.LineNumber
+ }
+ }
+
+ // Set up the workers and channel for this type.
+ lastLineType = line.Type
+ linesChan = make(chan LineImportWorkerData, workers)
+ for i := 0; i < workers; i++ {
+ wg.Add(1)
+ go bulkImportWorker(dryRun, &wg, linesChan, errorsChan)
+ }
+ }
+
+ select {
+ case linesChan <- LineImportWorkerData{line, lineNumber}:
+ case err := <-errorsChan:
+ close(linesChan)
+ wg.Wait()
+ return err.Error, err.LineNumber
+ }
}
}
}
+ // No more lines. Clear out the worker queue before continuing.
+ close(linesChan)
+ wg.Wait()
+
+ // Check no errors occurred while waiting for the queue to empty.
+ if len(errorsChan) != 0 {
+ err := <-errorsChan
+ return err.Error, err.LineNumber
+ }
+
if err := scanner.Err(); err != nil {
return model.NewLocAppError("BulkImport", "app.import.bulk_import.file_scan.error", nil, err.Error()), 0
}
diff --git a/app/import_test.go b/app/import_test.go
index 5895314a3..0290bd53f 100644
--- a/app/import_test.go
+++ b/app/import_test.go
@@ -1747,13 +1747,13 @@ func TestImportBulkImport(t *testing.T) {
{"type": "user", "user": {"username": "` + username + `", "email": "` + username + `@example.com", "teams": [{"name": "` + teamName + `", "channels": [{"name": "` + channelName + `"}]}]}}
{"type": "post", "post": {"team": "` + teamName + `", "channel": "` + channelName + `", "user": "` + username + `", "message": "Hello World", "create_at": 123456789012}}`
- if err, line := BulkImport(strings.NewReader(data1), false); err != nil || line != 0 {
+ if err, line := BulkImport(strings.NewReader(data1), false, 2); err != nil || line != 0 {
t.Fatalf("BulkImport should have succeeded: %v, %v", err.Error(), line)
}
// Run bulk import using a string that contains a line with invalid json.
data2 := `{"type": "version", "version": 1`
- if err, line := BulkImport(strings.NewReader(data2), false); err == nil || line != 1 {
+ if err, line := BulkImport(strings.NewReader(data2), false, 2); err == nil || line != 1 {
t.Fatalf("Should have failed due to invalid JSON on line 1.")
}
@@ -1762,7 +1762,7 @@ func TestImportBulkImport(t *testing.T) {
{"type": "channel", "channel": {"type": "O", "display_name": "xr6m6udffngark2uekvr3hoeny", "team": "` + teamName + `", "name": "` + channelName + `"}}
{"type": "user", "user": {"username": "kufjgnkxkrhhfgbrip6qxkfsaa", "email": "kufjgnkxkrhhfgbrip6qxkfsaa@example.com"}}
{"type": "user", "user": {"username": "bwshaim6qnc2ne7oqkd5b2s2rq", "email": "bwshaim6qnc2ne7oqkd5b2s2rq@example.com", "teams": [{"name": "` + teamName + `", "channels": [{"name": "` + channelName + `"}]}]}}`
- if err, line := BulkImport(strings.NewReader(data3), false); err == nil || line != 1 {
+ if err, line := BulkImport(strings.NewReader(data3), false, 2); err == nil || line != 1 {
t.Fatalf("Should have failed due to missing version line on line 1.")
}
}
diff --git a/cmd/platform/import.go b/cmd/platform/import.go
index 9cee26a52..ea3e42ad2 100644
--- a/cmd/platform/import.go
+++ b/cmd/platform/import.go
@@ -35,6 +35,7 @@ var bulkImportCmd = &cobra.Command{
func init() {
bulkImportCmd.Flags().Bool("apply", false, "Save the import data to the database. Use with caution - this cannot be reverted.")
bulkImportCmd.Flags().Bool("validate", false, "Validate the import data without making any changes to the system.")
+ bulkImportCmd.Flags().Int("workers", 2, "How many workers to run whilst doing the import.")
importCmd.AddCommand(
bulkImportCmd,
@@ -87,6 +88,11 @@ func bulkImportCmdF(cmd *cobra.Command, args []string) error {
return errors.New("Validate flag error")
}
+ workers, err := cmd.Flags().GetInt("workers")
+ if err != nil {
+ return errors.New("Workers flag error")
+ }
+
if len(args) != 1 {
return errors.New("Incorrect number of arguments.")
}
@@ -110,7 +116,7 @@ func bulkImportCmdF(cmd *cobra.Command, args []string) error {
CommandPrettyPrintln("")
- if err, lineNumber := app.BulkImport(fileReader, !apply); err != nil {
+ if err, lineNumber := app.BulkImport(fileReader, !apply, workers); err != nil {
CommandPrettyPrintln(err.Error())
if lineNumber != 0 {
CommandPrettyPrintln(fmt.Sprintf("Error occurred on data file line %v", lineNumber))