From 302ec17beed9128101ef61d69b45d3ee29e16f1e Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Fri, 28 Apr 2017 17:54:04 +0100 Subject: Parallelise Bulk Import. (#6267) * Parallelise Bulk Import. * Set worker count through command line flag. --- app/import.go | 70 +++++++++++++++++++++++++++++++++++++++++++++++--- app/import_test.go | 6 ++--- cmd/platform/import.go | 8 +++++- 3 files changed, 77 insertions(+), 7 deletions(-) diff --git a/app/import.go b/app/import.go index f92c9b1cc..488cb22aa 100644 --- a/app/import.go +++ b/app/import.go @@ -11,6 +11,7 @@ import ( "net/http" "regexp" "strings" + "sync" "unicode/utf8" l4g "github.com/alecthomas/log4go" @@ -95,15 +96,40 @@ type PostImportData struct { CreateAt *int64 `json:"create_at"` } +type LineImportWorkerData struct { + LineImportData + LineNumber int +} + +type LineImportWorkerError struct { + Error *model.AppError + LineNumber int +} + // // -- Bulk Import Functions -- // These functions import data directly into the database. Security and permission checks are bypassed but validity is // still enforced. // -func BulkImport(fileReader io.Reader, dryRun bool) (*model.AppError, int) { +func bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan LineImportWorkerData, errors chan<- LineImportWorkerError) { + for line := range lines { + if err := ImportLine(line.LineImportData, dryRun); err != nil { + errors <- LineImportWorkerError{err, line.LineNumber} + } + } + wg.Done() +} + +func BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int) { scanner := bufio.NewScanner(fileReader) lineNumber := 0 + + errorsChan := make(chan LineImportWorkerError, (2*workers)+1) // size chosen to ensure it never gets filled up completely. + var wg sync.WaitGroup + var linesChan chan LineImportWorkerData + lastLineType := "" + for scanner.Scan() { decoder := json.NewDecoder(strings.NewReader(scanner.Text())) lineNumber++ @@ -121,12 +147,50 @@ func BulkImport(fileReader io.Reader, dryRun bool) (*model.AppError, int) { if importDataFileVersion != 1 { return model.NewAppError("BulkImport", "app.import.bulk_import.unsupported_version.error", nil, "", http.StatusBadRequest), lineNumber } - } else if err := ImportLine(line, dryRun); err != nil { - return err, lineNumber + } else { + if line.Type != lastLineType { + if lastLineType != "" { + // Changing type. Clear out the worker queue before continuing. + close(linesChan) + wg.Wait() + + // Check no errors occurred while waiting for the queue to empty. + if len(errorsChan) != 0 { + err := <-errorsChan + return err.Error, err.LineNumber + } + } + + // Set up the workers and channel for this type. + lastLineType = line.Type + linesChan = make(chan LineImportWorkerData, workers) + for i := 0; i < workers; i++ { + wg.Add(1) + go bulkImportWorker(dryRun, &wg, linesChan, errorsChan) + } + } + + select { + case linesChan <- LineImportWorkerData{line, lineNumber}: + case err := <-errorsChan: + close(linesChan) + wg.Wait() + return err.Error, err.LineNumber + } } } } + // No more lines. Clear out the worker queue before continuing. + close(linesChan) + wg.Wait() + + // Check no errors occurred while waiting for the queue to empty. + if len(errorsChan) != 0 { + err := <-errorsChan + return err.Error, err.LineNumber + } + if err := scanner.Err(); err != nil { return model.NewLocAppError("BulkImport", "app.import.bulk_import.file_scan.error", nil, err.Error()), 0 } diff --git a/app/import_test.go b/app/import_test.go index 5895314a3..0290bd53f 100644 --- a/app/import_test.go +++ b/app/import_test.go @@ -1747,13 +1747,13 @@ func TestImportBulkImport(t *testing.T) { {"type": "user", "user": {"username": "` + username + `", "email": "` + username + `@example.com", "teams": [{"name": "` + teamName + `", "channels": [{"name": "` + channelName + `"}]}]}} {"type": "post", "post": {"team": "` + teamName + `", "channel": "` + channelName + `", "user": "` + username + `", "message": "Hello World", "create_at": 123456789012}}` - if err, line := BulkImport(strings.NewReader(data1), false); err != nil || line != 0 { + if err, line := BulkImport(strings.NewReader(data1), false, 2); err != nil || line != 0 { t.Fatalf("BulkImport should have succeeded: %v, %v", err.Error(), line) } // Run bulk import using a string that contains a line with invalid json. data2 := `{"type": "version", "version": 1` - if err, line := BulkImport(strings.NewReader(data2), false); err == nil || line != 1 { + if err, line := BulkImport(strings.NewReader(data2), false, 2); err == nil || line != 1 { t.Fatalf("Should have failed due to invalid JSON on line 1.") } @@ -1762,7 +1762,7 @@ func TestImportBulkImport(t *testing.T) { {"type": "channel", "channel": {"type": "O", "display_name": "xr6m6udffngark2uekvr3hoeny", "team": "` + teamName + `", "name": "` + channelName + `"}} {"type": "user", "user": {"username": "kufjgnkxkrhhfgbrip6qxkfsaa", "email": "kufjgnkxkrhhfgbrip6qxkfsaa@example.com"}} {"type": "user", "user": {"username": "bwshaim6qnc2ne7oqkd5b2s2rq", "email": "bwshaim6qnc2ne7oqkd5b2s2rq@example.com", "teams": [{"name": "` + teamName + `", "channels": [{"name": "` + channelName + `"}]}]}}` - if err, line := BulkImport(strings.NewReader(data3), false); err == nil || line != 1 { + if err, line := BulkImport(strings.NewReader(data3), false, 2); err == nil || line != 1 { t.Fatalf("Should have failed due to missing version line on line 1.") } } diff --git a/cmd/platform/import.go b/cmd/platform/import.go index 9cee26a52..ea3e42ad2 100644 --- a/cmd/platform/import.go +++ b/cmd/platform/import.go @@ -35,6 +35,7 @@ var bulkImportCmd = &cobra.Command{ func init() { bulkImportCmd.Flags().Bool("apply", false, "Save the import data to the database. Use with caution - this cannot be reverted.") bulkImportCmd.Flags().Bool("validate", false, "Validate the import data without making any changes to the system.") + bulkImportCmd.Flags().Int("workers", 2, "How many workers to run whilst doing the import.") importCmd.AddCommand( bulkImportCmd, @@ -87,6 +88,11 @@ func bulkImportCmdF(cmd *cobra.Command, args []string) error { return errors.New("Validate flag error") } + workers, err := cmd.Flags().GetInt("workers") + if err != nil { + return errors.New("Workers flag error") + } + if len(args) != 1 { return errors.New("Incorrect number of arguments.") } @@ -110,7 +116,7 @@ func bulkImportCmdF(cmd *cobra.Command, args []string) error { CommandPrettyPrintln("") - if err, lineNumber := app.BulkImport(fileReader, !apply); err != nil { + if err, lineNumber := app.BulkImport(fileReader, !apply, workers); err != nil { CommandPrettyPrintln(err.Error()) if lineNumber != 0 { CommandPrettyPrintln(fmt.Sprintf("Error occurred on data file line %v", lineNumber)) -- cgit v1.2.3-1-g7c22