summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJoram Wilander <jwawilander@gmail.com>2018-06-27 05:08:40 -0400
committerGeorge Goldberg <george@gberg.me>2018-06-27 10:08:40 +0100
commit0d91bf323ec32f46453b2c4c1877e8eeb830ab4d (patch)
treed458825ac3f5c710e95fc171fa23217377e53b37
parentf17c15c9d83e42e46adb8e8c1fb9706b22fe6f50 (diff)
downloadchat-0d91bf323ec32f46453b2c4c1877e8eeb830ab4d.tar.gz
chat-0d91bf323ec32f46453b2c4c1877e8eeb830ab4d.tar.bz2
chat-0d91bf323ec32f46453b2c4c1877e8eeb830ab4d.zip
Lock bulk importing to master database node (#9012)
-rw-r--r--app/import.go3
-rw-r--r--store/layered_store.go8
-rw-r--r--store/sqlstore/store.go2
-rw-r--r--store/sqlstore/supplier.go11
-rw-r--r--store/store.go2
-rw-r--r--store/storetest/mocks/LayeredStoreDatabaseLayer.go10
-rw-r--r--store/storetest/mocks/SqlStore.go10
-rw-r--r--store/storetest/mocks/Store.go10
-rw-r--r--store/storetest/store.go2
9 files changed, 57 insertions, 1 deletions
diff --git a/app/import.go b/app/import.go
index 04e71b64c..64e53fe93 100644
--- a/app/import.go
+++ b/app/import.go
@@ -209,6 +209,9 @@ func (a *App) BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model
scanner := bufio.NewScanner(fileReader)
lineNumber := 0
+ a.Srv.Store.LockToMaster()
+ defer a.Srv.Store.UnlockFromMaster()
+
errorsChan := make(chan LineImportWorkerError, (2*workers)+1) // size chosen to ensure it never gets filled up completely.
var wg sync.WaitGroup
var linesChan chan LineImportWorkerData
diff --git a/store/layered_store.go b/store/layered_store.go
index 851d7536c..6587868d6 100644
--- a/store/layered_store.go
+++ b/store/layered_store.go
@@ -181,6 +181,14 @@ func (s *LayeredStore) Close() {
s.DatabaseLayer.Close()
}
+func (s *LayeredStore) LockToMaster() {
+ s.DatabaseLayer.LockToMaster()
+}
+
+func (s *LayeredStore) UnlockFromMaster() {
+ s.DatabaseLayer.UnlockFromMaster()
+}
+
func (s *LayeredStore) DropAllTables() {
s.DatabaseLayer.DropAllTables()
}
diff --git a/store/sqlstore/store.go b/store/sqlstore/store.go
index fc7b3be18..500f98235 100644
--- a/store/sqlstore/store.go
+++ b/store/sqlstore/store.go
@@ -65,6 +65,8 @@ type SqlStore interface {
RemoveIndexIfExists(indexName string, tableName string) bool
GetAllConns() []*gorp.DbMap
Close()
+ LockToMaster()
+ UnlockFromMaster()
Team() store.TeamStore
Channel() store.ChannelStore
Post() store.PostStore
diff --git a/store/sqlstore/supplier.go b/store/sqlstore/supplier.go
index 02a3cef7f..1e6bdcba1 100644
--- a/store/sqlstore/supplier.go
+++ b/store/sqlstore/supplier.go
@@ -105,6 +105,7 @@ type SqlSupplier struct {
searchReplicas []*gorp.DbMap
oldStores SqlSupplierOldStores
settings *model.SqlSettings
+ lockedToMaster bool
}
func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInterface) *SqlSupplier {
@@ -283,7 +284,7 @@ func (ss *SqlSupplier) GetSearchReplica() *gorp.DbMap {
}
func (ss *SqlSupplier) GetReplica() *gorp.DbMap {
- if len(ss.settings.DataSourceReplicas) == 0 {
+ if len(ss.settings.DataSourceReplicas) == 0 || ss.lockedToMaster {
return ss.GetMaster()
}
@@ -801,6 +802,14 @@ func (ss *SqlSupplier) Close() {
}
}
+func (ss *SqlSupplier) LockToMaster() {
+ ss.lockedToMaster = true
+}
+
+func (ss *SqlSupplier) UnlockFromMaster() {
+ ss.lockedToMaster = false
+}
+
func (ss *SqlSupplier) Team() store.TeamStore {
return ss.oldStores.team
}
diff --git a/store/store.go b/store/store.go
index bb967e614..203c637ff 100644
--- a/store/store.go
+++ b/store/store.go
@@ -67,6 +67,8 @@ type Store interface {
Plugin() PluginStore
MarkSystemRanUnitTests()
Close()
+ LockToMaster()
+ UnlockFromMaster()
DropAllTables()
TotalMasterDbConnections() int
TotalReadDbConnections() int
diff --git a/store/storetest/mocks/LayeredStoreDatabaseLayer.go b/store/storetest/mocks/LayeredStoreDatabaseLayer.go
index adbe1068c..47b594a81 100644
--- a/store/storetest/mocks/LayeredStoreDatabaseLayer.go
+++ b/store/storetest/mocks/LayeredStoreDatabaseLayer.go
@@ -200,6 +200,11 @@ func (_m *LayeredStoreDatabaseLayer) License() store.LicenseStore {
return r0
}
+// LockToMaster provides a mock function with given fields:
+func (_m *LayeredStoreDatabaseLayer) LockToMaster() {
+ _m.Called()
+}
+
// MarkSystemRanUnitTests provides a mock function with given fields:
func (_m *LayeredStoreDatabaseLayer) MarkSystemRanUnitTests() {
_m.Called()
@@ -851,6 +856,11 @@ func (_m *LayeredStoreDatabaseLayer) TotalSearchDbConnections() int {
return r0
}
+// UnlockFromMaster provides a mock function with given fields:
+func (_m *LayeredStoreDatabaseLayer) UnlockFromMaster() {
+ _m.Called()
+}
+
// User provides a mock function with given fields:
func (_m *LayeredStoreDatabaseLayer) User() store.UserStore {
ret := _m.Called()
diff --git a/store/storetest/mocks/SqlStore.go b/store/storetest/mocks/SqlStore.go
index 021baa7d3..a0cc3f0cc 100644
--- a/store/storetest/mocks/SqlStore.go
+++ b/store/storetest/mocks/SqlStore.go
@@ -411,6 +411,11 @@ func (_m *SqlStore) License() store.LicenseStore {
return r0
}
+// LockToMaster provides a mock function with given fields:
+func (_m *SqlStore) LockToMaster() {
+ _m.Called()
+}
+
// MarkSystemRanUnitTests provides a mock function with given fields:
func (_m *SqlStore) MarkSystemRanUnitTests() {
_m.Called()
@@ -706,6 +711,11 @@ func (_m *SqlStore) TotalSearchDbConnections() int {
return r0
}
+// UnlockFromMaster provides a mock function with given fields:
+func (_m *SqlStore) UnlockFromMaster() {
+ _m.Called()
+}
+
// User provides a mock function with given fields:
func (_m *SqlStore) User() store.UserStore {
ret := _m.Called()
diff --git a/store/storetest/mocks/Store.go b/store/storetest/mocks/Store.go
index dd1967cd5..3a5b7726c 100644
--- a/store/storetest/mocks/Store.go
+++ b/store/storetest/mocks/Store.go
@@ -198,6 +198,11 @@ func (_m *Store) License() store.LicenseStore {
return r0
}
+// LockToMaster provides a mock function with given fields:
+func (_m *Store) LockToMaster() {
+ _m.Called()
+}
+
// MarkSystemRanUnitTests provides a mock function with given fields:
func (_m *Store) MarkSystemRanUnitTests() {
_m.Called()
@@ -437,6 +442,11 @@ func (_m *Store) TotalSearchDbConnections() int {
return r0
}
+// UnlockFromMaster provides a mock function with given fields:
+func (_m *Store) UnlockFromMaster() {
+ _m.Called()
+}
+
// User provides a mock function with given fields:
func (_m *Store) User() store.UserStore {
ret := _m.Called()
diff --git a/store/storetest/store.go b/store/storetest/store.go
index 677a63101..e73596ec4 100644
--- a/store/storetest/store.go
+++ b/store/storetest/store.go
@@ -77,6 +77,8 @@ func (s *Store) ChannelMemberHistory() store.ChannelMemberHistoryStore {
}
func (s *Store) MarkSystemRanUnitTests() { /* do nothing */ }
func (s *Store) Close() { /* do nothing */ }
+func (s *Store) LockToMaster() { /* do nothing */ }
+func (s *Store) UnlockFromMaster() { /* do nothing */ }
func (s *Store) DropAllTables() { /* do nothing */ }
func (s *Store) TotalMasterDbConnections() int { return 1 }
func (s *Store) TotalReadDbConnections() int { return 1 }