summaryrefslogtreecommitdiffstats
path: root/app/import.go
diff options
context:
space:
mode:
authorGeorge Goldberg <george@gberg.me>2017-04-28 17:54:04 +0100
committerGitHub <noreply@github.com>2017-04-28 17:54:04 +0100
commit302ec17beed9128101ef61d69b45d3ee29e16f1e (patch)
tree02a5783ff0f7aaa488850273bdd485d595dd228c /app/import.go
parent36a15925dc4d10ac0c0a14ddbc60855aa5331519 (diff)
downloadchat-302ec17beed9128101ef61d69b45d3ee29e16f1e.tar.gz
chat-302ec17beed9128101ef61d69b45d3ee29e16f1e.tar.bz2
chat-302ec17beed9128101ef61d69b45d3ee29e16f1e.zip
Parallelise Bulk Import. (#6267)
* Parallelise Bulk Import. * Set worker count through command line flag.
Diffstat (limited to 'app/import.go')
-rw-r--r--app/import.go70
1 files changed, 67 insertions, 3 deletions
diff --git a/app/import.go b/app/import.go
index f92c9b1cc..488cb22aa 100644
--- a/app/import.go
+++ b/app/import.go
@@ -11,6 +11,7 @@ import (
"net/http"
"regexp"
"strings"
+ "sync"
"unicode/utf8"
l4g "github.com/alecthomas/log4go"
@@ -95,15 +96,40 @@ type PostImportData struct {
CreateAt *int64 `json:"create_at"`
}
+type LineImportWorkerData struct {
+ LineImportData
+ LineNumber int
+}
+
+type LineImportWorkerError struct {
+ Error *model.AppError
+ LineNumber int
+}
+
//
// -- Bulk Import Functions --
// These functions import data directly into the database. Security and permission checks are bypassed but validity is
// still enforced.
//
-func BulkImport(fileReader io.Reader, dryRun bool) (*model.AppError, int) {
+func bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan LineImportWorkerData, errors chan<- LineImportWorkerError) {
+ for line := range lines {
+ if err := ImportLine(line.LineImportData, dryRun); err != nil {
+ errors <- LineImportWorkerError{err, line.LineNumber}
+ }
+ }
+ wg.Done()
+}
+
+func BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int) {
scanner := bufio.NewScanner(fileReader)
lineNumber := 0
+
+ errorsChan := make(chan LineImportWorkerError, (2*workers)+1) // size chosen to ensure it never gets filled up completely.
+ var wg sync.WaitGroup
+ var linesChan chan LineImportWorkerData
+ lastLineType := ""
+
for scanner.Scan() {
decoder := json.NewDecoder(strings.NewReader(scanner.Text()))
lineNumber++
@@ -121,12 +147,50 @@ func BulkImport(fileReader io.Reader, dryRun bool) (*model.AppError, int) {
if importDataFileVersion != 1 {
return model.NewAppError("BulkImport", "app.import.bulk_import.unsupported_version.error", nil, "", http.StatusBadRequest), lineNumber
}
- } else if err := ImportLine(line, dryRun); err != nil {
- return err, lineNumber
+ } else {
+ if line.Type != lastLineType {
+ if lastLineType != "" {
+ // Changing type. Clear out the worker queue before continuing.
+ close(linesChan)
+ wg.Wait()
+
+ // Check no errors occurred while waiting for the queue to empty.
+ if len(errorsChan) != 0 {
+ err := <-errorsChan
+ return err.Error, err.LineNumber
+ }
+ }
+
+ // Set up the workers and channel for this type.
+ lastLineType = line.Type
+ linesChan = make(chan LineImportWorkerData, workers)
+ for i := 0; i < workers; i++ {
+ wg.Add(1)
+ go bulkImportWorker(dryRun, &wg, linesChan, errorsChan)
+ }
+ }
+
+ select {
+ case linesChan <- LineImportWorkerData{line, lineNumber}:
+ case err := <-errorsChan:
+ close(linesChan)
+ wg.Wait()
+ return err.Error, err.LineNumber
+ }
}
}
}
+ // No more lines. Clear out the worker queue before continuing.
+ close(linesChan)
+ wg.Wait()
+
+ // Check no errors occurred while waiting for the queue to empty.
+ if len(errorsChan) != 0 {
+ err := <-errorsChan
+ return err.Error, err.LineNumber
+ }
+
if err := scanner.Err(); err != nil {
return model.NewLocAppError("BulkImport", "app.import.bulk_import.file_scan.error", nil, err.Error()), 0
}