summaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
Diffstat (limited to 'app')
-rw-r--r--app/admin.go10
-rw-r--r--app/app.go69
-rw-r--r--app/app_test.go56
-rw-r--r--app/apptestlib.go72
-rw-r--r--app/diagnostics.go2
-rw-r--r--app/import.go39
-rw-r--r--app/notification.go4
-rw-r--r--app/oauth.go12
-rw-r--r--app/options.go59
-rw-r--r--app/plugins.go10
-rw-r--r--app/post.go11
-rw-r--r--app/security_update_check.go3
-rw-r--r--app/server.go124
-rw-r--r--app/user.go6
-rw-r--r--app/web_conn.go5
-rw-r--r--app/webhook.go2
-rw-r--r--app/webrtc.go42
-rw-r--r--app/websocket_router.go7
18 files changed, 407 insertions, 126 deletions
diff --git a/app/admin.go b/app/admin.go
index dab7e9759..5994fc826 100644
--- a/app/admin.go
+++ b/app/admin.go
@@ -15,7 +15,6 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/mattermost/mattermost-server/model"
- "github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/store/sqlstore"
"github.com/mattermost/mattermost-server/utils"
)
@@ -187,12 +186,13 @@ func (a *App) RecycleDatabaseConnection() {
oldStore := a.Srv.Store
l4g.Warn(utils.T("api.admin.recycle_db_start.warn"))
- a.Srv.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster)
-
+ a.Srv.Store = a.newStore()
a.Jobs.Store = a.Srv.Store
- time.Sleep(20 * time.Second)
- oldStore.Close()
+ if a.Srv.Store != oldStore {
+ time.Sleep(20 * time.Second)
+ oldStore.Close()
+ }
l4g.Warn(utils.T("api.admin.recycle_db_end.warn"))
}
diff --git a/app/app.go b/app/app.go
index a250efe5c..34c0721a0 100644
--- a/app/app.go
+++ b/app/app.go
@@ -4,17 +4,19 @@
package app
import (
- "io/ioutil"
"net/http"
"sync/atomic"
l4g "github.com/alecthomas/log4go"
+ "github.com/gorilla/mux"
"github.com/mattermost/mattermost-server/einterfaces"
ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/jobs"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/plugin/pluginenv"
+ "github.com/mattermost/mattermost-server/store"
+ "github.com/mattermost/mattermost-server/store/sqlstore"
"github.com/mattermost/mattermost-server/utils"
)
@@ -44,44 +46,70 @@ type App struct {
Metrics einterfaces.MetricsInterface
Mfa einterfaces.MfaInterface
Saml einterfaces.SamlInterface
+
+ newStore func() store.Store
+ configOverride func(*model.Config) *model.Config
}
var appCount = 0
// New creates a new App. You must call Shutdown when you're done with it.
// XXX: For now, only one at a time is allowed as some resources are still shared.
-func New() *App {
+func New(options ...Option) *App {
appCount++
if appCount > 1 {
panic("Only one App should exist at a time. Did you forget to call Shutdown()?")
}
+ l4g.Info(utils.T("api.server.new_server.init.info"))
+
app := &App{
goroutineExitSignal: make(chan struct{}, 1),
Jobs: &jobs.JobServer{},
+ Srv: &Server{
+ Router: mux.NewRouter(),
+ },
}
app.initEnterprise()
+
+ for _, option := range options {
+ option(app)
+ }
+
+ if app.newStore == nil {
+ app.newStore = func() store.Store {
+ return store.NewLayeredStore(sqlstore.NewSqlSupplier(app.Config().SqlSettings, app.Metrics), app.Metrics, app.Cluster)
+ }
+ }
+
+ app.Srv.Store = app.newStore()
+ app.Jobs.Store = app.Srv.Store
+
+ app.Srv.Router.NotFoundHandler = http.HandlerFunc(app.Handle404)
+
+ app.Srv.WebSocketRouter = &WebSocketRouter{
+ app: app,
+ handlers: make(map[string]webSocketHandler),
+ }
+
return app
}
func (a *App) Shutdown() {
appCount--
- if a.Srv != nil {
- l4g.Info(utils.T("api.server.stop_server.stopping.info"))
+ l4g.Info(utils.T("api.server.stop_server.stopping.info"))
- a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN)
- <-a.Srv.GracefulServer.StopChan()
- a.HubStop()
+ a.StopServer()
+ a.HubStop()
- a.ShutDownPlugins()
- a.WaitForGoroutines()
+ a.ShutDownPlugins()
+ a.WaitForGoroutines()
- a.Srv.Store.Close()
- a.Srv = nil
+ a.Srv.Store.Close()
+ a.Srv = nil
- l4g.Info(utils.T("api.server.stop_server.stopped.info"))
- }
+ l4g.Info(utils.T("api.server.stop_server.stopped.info"))
}
var accountMigrationInterface func(*App) einterfaces.AccountMigrationInterface
@@ -206,6 +234,9 @@ func (a *App) initEnterprise() {
}
func (a *App) Config() *model.Config {
+ if a.configOverride != nil {
+ return a.configOverride(utils.Cfg)
+ }
return utils.Cfg
}
@@ -232,9 +263,11 @@ func (a *App) WaitForGoroutines() {
}
}
-func CloseBody(r *http.Response) {
- if r.Body != nil {
- ioutil.ReadAll(r.Body)
- r.Body.Close()
- }
+func (a *App) Handle404(w http.ResponseWriter, r *http.Request) {
+ err := model.NewAppError("Handle404", "api.context.404.app_error", nil, "", http.StatusNotFound)
+ err.Translate(utils.T)
+
+ l4g.Debug("%v: code=404 ip=%v", r.URL.Path, utils.GetIpAddress(r))
+
+ utils.RenderWebError(err, w, r)
}
diff --git a/app/app_test.go b/app/app_test.go
new file mode 100644
index 000000000..6f2a3a23a
--- /dev/null
+++ b/app/app_test.go
@@ -0,0 +1,56 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "flag"
+ "os"
+ "testing"
+
+ l4g "github.com/alecthomas/log4go"
+
+ "github.com/mattermost/mattermost-server/store/storetest"
+ "github.com/mattermost/mattermost-server/utils"
+)
+
+func TestMain(m *testing.M) {
+ flag.Parse()
+
+ // In the case where a dev just wants to run a single test, it's faster to just use the default
+ // store.
+ if filter := flag.Lookup("test.run").Value.String(); filter != "" && filter != "." {
+ utils.TranslationsPreInit()
+ utils.LoadConfig("config.json")
+ l4g.Info("-test.run used, not creating temporary containers")
+ os.Exit(m.Run())
+ }
+
+ utils.TranslationsPreInit()
+ utils.LoadConfig("config.json")
+ utils.InitTranslations(utils.Cfg.LocalizationSettings)
+
+ status := 0
+
+ container, settings, err := storetest.NewMySQLContainer()
+ if err != nil {
+ panic(err)
+ }
+
+ UseTestStore(container, settings)
+
+ defer func() {
+ StopTestStore()
+ os.Exit(status)
+ }()
+
+ status = m.Run()
+}
+
+func TestAppRace(t *testing.T) {
+ for i := 0; i < 10; i++ {
+ a := New()
+ a.StartServer()
+ a.Shutdown()
+ }
+}
diff --git a/app/apptestlib.go b/app/apptestlib.go
index 29139ac39..9c26e0bbb 100644
--- a/app/apptestlib.go
+++ b/app/apptestlib.go
@@ -7,6 +7,9 @@ import (
"time"
"github.com/mattermost/mattermost-server/model"
+ "github.com/mattermost/mattermost-server/store"
+ "github.com/mattermost/mattermost-server/store/sqlstore"
+ "github.com/mattermost/mattermost-server/store/storetest"
"github.com/mattermost/mattermost-server/utils"
l4g "github.com/alecthomas/log4go"
@@ -21,27 +24,58 @@ type TestHelper struct {
BasicPost *model.Post
}
-func setupTestHelper(enterprise bool) *TestHelper {
- th := &TestHelper{
- App: New(),
+type persistentTestStore struct {
+ store.Store
+}
+
+func (*persistentTestStore) Close() {}
+
+var testStoreContainer *storetest.RunningContainer
+var testStore *persistentTestStore
+
+// UseTestStore sets the container and corresponding settings to use for tests. Once the tests are
+// complete (e.g. at the end of your TestMain implementation), you should call StopTestStore.
+func UseTestStore(container *storetest.RunningContainer, settings *model.SqlSettings) {
+ testStoreContainer = container
+ testStore = &persistentTestStore{store.NewLayeredStore(sqlstore.NewSqlSupplier(*settings, nil), nil, nil)}
+}
+
+func StopTestStore() {
+ if testStoreContainer != nil {
+ testStoreContainer.Stop()
+ testStoreContainer = nil
}
+}
- if th.App.Srv == nil {
+func setupTestHelper(enterprise bool) *TestHelper {
+ if utils.T == nil {
utils.TranslationsPreInit()
- utils.LoadConfig("config.json")
- utils.InitTranslations(utils.Cfg.LocalizationSettings)
- *utils.Cfg.TeamSettings.MaxUsersPerTeam = 50
- *utils.Cfg.RateLimitSettings.Enable = false
- utils.DisableDebugLogForTest()
- th.App.NewServer()
- th.App.InitStores()
- th.App.StartServer()
- utils.InitHTML()
- utils.EnableDebugLogForTest()
- th.App.Srv.Store.MarkSystemRanUnitTests()
-
- *utils.Cfg.TeamSettings.EnableOpenServer = true
}
+ utils.LoadConfig("config.json")
+ utils.InitTranslations(utils.Cfg.LocalizationSettings)
+
+ var options []Option
+ if testStore != nil {
+ options = append(options, StoreOverride(testStore))
+ options = append(options, ConfigOverride(func(cfg *model.Config) {
+ cfg.ServiceSettings.ListenAddress = new(string)
+ *cfg.ServiceSettings.ListenAddress = ":0"
+ }))
+ }
+
+ th := &TestHelper{
+ App: New(options...),
+ }
+
+ *utils.Cfg.TeamSettings.MaxUsersPerTeam = 50
+ *utils.Cfg.RateLimitSettings.Enable = false
+ utils.DisableDebugLogForTest()
+ th.App.StartServer()
+ utils.InitHTML()
+ utils.EnableDebugLogForTest()
+ th.App.Srv.Store.MarkSystemRanUnitTests()
+
+ *utils.Cfg.TeamSettings.EnableOpenServer = true
utils.SetIsLicensed(enterprise)
if enterprise {
@@ -191,4 +225,8 @@ func (me *TestHelper) LinkUserToTeam(user *model.User, team *model.Team) {
func (me *TestHelper) TearDown() {
me.App.Shutdown()
+ if err := recover(); err != nil {
+ StopTestStore()
+ panic(err)
+ }
}
diff --git a/app/diagnostics.go b/app/diagnostics.go
index 9e5742111..5f5ef35b2 100644
--- a/app/diagnostics.go
+++ b/app/diagnostics.go
@@ -204,6 +204,7 @@ func trackConfig() {
"session_length_mobile_in_days": *utils.Cfg.ServiceSettings.SessionLengthMobileInDays,
"session_length_sso_in_days": *utils.Cfg.ServiceSettings.SessionLengthSSOInDays,
"session_cache_in_minutes": *utils.Cfg.ServiceSettings.SessionCacheInMinutes,
+ "session_idle_timeout_in_minutes": *utils.Cfg.ServiceSettings.SessionIdleTimeoutInMinutes,
"isdefault_site_url": isDefault(*utils.Cfg.ServiceSettings.SiteURL, model.SERVICE_SETTINGS_DEFAULT_SITE_URL),
"isdefault_tls_cert_file": isDefault(*utils.Cfg.ServiceSettings.TLSCertFile, model.SERVICE_SETTINGS_DEFAULT_TLS_CERT_FILE),
"isdefault_tls_key_file": isDefault(*utils.Cfg.ServiceSettings.TLSKeyFile, model.SERVICE_SETTINGS_DEFAULT_TLS_KEY_FILE),
@@ -442,6 +443,7 @@ func trackConfig() {
"sniff": *utils.Cfg.ElasticsearchSettings.Sniff,
"post_index_replicas": *utils.Cfg.ElasticsearchSettings.PostIndexReplicas,
"post_index_shards": *utils.Cfg.ElasticsearchSettings.PostIndexShards,
+ "isdefault_index_prefix": isDefault(*utils.Cfg.ElasticsearchSettings.IndexPrefix, model.ELASTICSEARCH_SETTINGS_DEFAULT_INDEX_PREFIX),
})
SendDiagnostic(TRACK_CONFIG_PLUGIN, map[string]interface{}{
diff --git a/app/import.go b/app/import.go
index 6a309ad3e..f7f9cf144 100644
--- a/app/import.go
+++ b/app/import.go
@@ -644,28 +644,30 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError {
}
}
+ var err *model.AppError
+ var savedUser *model.User
if user.Id == "" {
- if _, err := a.createUser(user); err != nil {
+ if savedUser, err = a.createUser(user); err != nil {
return err
}
} else {
if hasUserChanged {
- if _, err := a.UpdateUser(user, false); err != nil {
+ if savedUser, err = a.UpdateUser(user, false); err != nil {
return err
}
}
if hasUserRolesChanged {
- if _, err := a.UpdateUserRoles(user.Id, roles); err != nil {
+ if savedUser, err = a.UpdateUserRoles(user.Id, roles); err != nil {
return err
}
}
if hasNotifyPropsChanged {
- if _, err := a.UpdateUserNotifyProps(user.Id, user.NotifyProps); err != nil {
+ if savedUser, err = a.UpdateUserNotifyProps(user.Id, user.NotifyProps); err != nil {
return err
}
}
if len(password) > 0 {
- if err := a.UpdatePassword(user, password); err != nil {
+ if err = a.UpdatePassword(user, password); err != nil {
return err
}
} else {
@@ -684,12 +686,16 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError {
}
}
+ if savedUser == nil {
+ savedUser = user
+ }
+
// Preferences.
var preferences model.Preferences
if data.Theme != nil {
preferences = append(preferences, model.Preference{
- UserId: user.Id,
+ UserId: savedUser.Id,
Category: model.PREFERENCE_CATEGORY_THEME,
Name: "",
Value: *data.Theme,
@@ -698,7 +704,7 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError {
if data.UseMilitaryTime != nil {
preferences = append(preferences, model.Preference{
- UserId: user.Id,
+ UserId: savedUser.Id,
Category: model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS,
Name: "use_military_time",
Value: *data.UseMilitaryTime,
@@ -707,7 +713,7 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError {
if data.CollapsePreviews != nil {
preferences = append(preferences, model.Preference{
- UserId: user.Id,
+ UserId: savedUser.Id,
Category: model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS,
Name: "collapse_previews",
Value: *data.CollapsePreviews,
@@ -716,7 +722,7 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError {
if data.MessageDisplay != nil {
preferences = append(preferences, model.Preference{
- UserId: user.Id,
+ UserId: savedUser.Id,
Category: model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS,
Name: "message_display",
Value: *data.MessageDisplay,
@@ -725,7 +731,7 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError {
if data.ChannelDisplayMode != nil {
preferences = append(preferences, model.Preference{
- UserId: user.Id,
+ UserId: savedUser.Id,
Category: model.PREFERENCE_CATEGORY_DISPLAY_SETTINGS,
Name: "channel_display_mode",
Value: *data.ChannelDisplayMode,
@@ -734,9 +740,9 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError {
if data.TutorialStep != nil {
preferences = append(preferences, model.Preference{
- UserId: user.Id,
+ UserId: savedUser.Id,
Category: model.PREFERENCE_CATEGORY_TUTORIAL_STEPS,
- Name: user.Id,
+ Name: savedUser.Id,
Value: *data.TutorialStep,
})
}
@@ -747,19 +753,14 @@ func (a *App) ImportUser(data *UserImportData, dryRun bool) *model.AppError {
}
}
- return a.ImportUserTeams(*data.Username, data.Teams)
+ return a.ImportUserTeams(savedUser, data.Teams)
}
-func (a *App) ImportUserTeams(username string, data *[]UserTeamImportData) *model.AppError {
+func (a *App) ImportUserTeams(user *model.User, data *[]UserTeamImportData) *model.AppError {
if data == nil {
return nil
}
- user, err := a.GetUserByUsername(username)
- if err != nil {
- return err
- }
-
for _, tdata := range *data {
team, err := a.GetTeamByName(*tdata.Name)
if err != nil {
diff --git a/app/notification.go b/app/notification.go
index 3df4a789f..2a8f9ff2e 100644
--- a/app/notification.go
+++ b/app/notification.go
@@ -7,7 +7,6 @@ import (
"fmt"
"html"
"html/template"
- "io/ioutil"
"net/http"
"net/url"
"path/filepath"
@@ -701,8 +700,7 @@ func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session
} else {
pushResponse := model.PushResponseFromJson(resp.Body)
if resp.Body != nil {
- ioutil.ReadAll(resp.Body)
- resp.Body.Close()
+ consumeAndClose(resp)
}
if pushResponse[model.PUSH_STATUS] == model.PUSH_STATUS_REMOVE {
diff --git a/app/oauth.go b/app/oauth.go
index 6e411138b..5a02f6238 100644
--- a/app/oauth.go
+++ b/app/oauth.go
@@ -7,7 +7,6 @@ import (
"bytes"
b64 "encoding/base64"
"io"
- "io/ioutil"
"net/http"
"net/url"
"strings"
@@ -428,10 +427,7 @@ func (a *App) RevokeAccessToken(token string) *model.AppError {
}
func (a *App) CompleteOAuth(service string, body io.ReadCloser, teamId string, props map[string]string) (*model.User, *model.AppError) {
- defer func() {
- ioutil.ReadAll(body)
- body.Close()
- }()
+ defer body.Close()
action := props["action"]
@@ -688,11 +684,9 @@ func (a *App) AuthorizeOAuthUser(w http.ResponseWriter, r *http.Request, service
if resp, err := utils.HttpClient(true).Do(req); err != nil {
return nil, "", stateProps, model.NewAppError("AuthorizeOAuthUser", "api.user.authorize_oauth_user.token_failed.app_error", nil, err.Error(), http.StatusInternalServerError)
} else {
- bodyBytes, _ = ioutil.ReadAll(resp.Body)
- resp.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
-
ar = model.AccessResponseFromJson(resp.Body)
- defer CloseBody(resp)
+ consumeAndClose(resp)
+
if ar == nil {
return nil, "", stateProps, model.NewAppError("AuthorizeOAuthUser", "api.user.authorize_oauth_user.bad_response.app_error", nil, "response_body="+string(bodyBytes), http.StatusInternalServerError)
}
diff --git a/app/options.go b/app/options.go
new file mode 100644
index 000000000..121bbbf80
--- /dev/null
+++ b/app/options.go
@@ -0,0 +1,59 @@
+// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
+// See License.txt for license information.
+
+package app
+
+import (
+ "github.com/mattermost/mattermost-server/model"
+ "github.com/mattermost/mattermost-server/store"
+)
+
+type Option func(a *App)
+
+// By default, the app will use a global configuration file. This allows you to override all or part
+// of that configuration.
+//
+// The override parameter must be a *model.Config, func(*model.Config), or func(*model.Config) *model.Config.
+//
+// XXX: Most code will not respect this at the moment. (We need to eliminate utils.Cfg first.)
+func ConfigOverride(override interface{}) Option {
+ return func(a *App) {
+ switch o := override.(type) {
+ case *model.Config:
+ a.configOverride = func(*model.Config) *model.Config {
+ return o
+ }
+ case func(*model.Config):
+ a.configOverride = func(cfg *model.Config) *model.Config {
+ ret := *cfg
+ o(&ret)
+ return &ret
+ }
+ case func(*model.Config) *model.Config:
+ a.configOverride = o
+ default:
+ panic("invalid ConfigOverride")
+ }
+ }
+}
+
+// By default, the app will use the store specified by the configuration. This allows you to
+// construct an app with a different store.
+//
+// The override parameter must be either a store.Store or func(App) store.Store.
+func StoreOverride(override interface{}) Option {
+ return func(a *App) {
+ switch o := override.(type) {
+ case store.Store:
+ a.newStore = func() store.Store {
+ return o
+ }
+ case func(*App) store.Store:
+ a.newStore = func() store.Store {
+ return o(a)
+ }
+ default:
+ panic("invalid StoreOverride")
+ }
+ }
+}
diff --git a/app/plugins.go b/app/plugins.go
index 2c87cee19..9826674f1 100644
--- a/app/plugins.go
+++ b/app/plugins.go
@@ -394,12 +394,14 @@ func (a *App) RemovePlugin(id string) *model.AppError {
}
func (a *App) InitPlugins(pluginPath, webappPath string) {
- a.InitBuiltInPlugins()
-
if !utils.IsLicensed() || !*utils.License().Features.FutureFeatures || !*utils.Cfg.PluginSettings.Enable {
return
}
+ if a.PluginEnv != nil {
+ return
+ }
+
l4g.Info("Starting up plugins")
err := os.Mkdir(pluginPath, 0744)
@@ -485,9 +487,13 @@ func (a *App) ShutDownPlugins() {
if a.PluginEnv == nil {
return
}
+
+ l4g.Info("Shutting down plugins")
+
for _, err := range a.PluginEnv.Shutdown() {
l4g.Error(err.Error())
}
utils.RemoveConfigListener(a.PluginConfigListenerId)
a.PluginConfigListenerId = ""
+ a.PluginEnv = nil
}
diff --git a/app/post.go b/app/post.go
index fa929b844..da5661ae2 100644
--- a/app/post.go
+++ b/app/post.go
@@ -362,9 +362,6 @@ func (a *App) PatchPost(postId string, patch *model.PostPatch) (*model.Post, *mo
return nil, err
}
- a.sendUpdatedPostEvent(updatedPost)
- a.InvalidateCacheForChannelPosts(updatedPost.ChannelId)
-
return updatedPost, nil
}
@@ -620,6 +617,10 @@ func (a *App) SearchPostsInTeam(terms string, userId string, teamId string, isOr
return postList, nil
} else {
+ if !*utils.Cfg.ServiceSettings.EnablePostSearch {
+ return nil, model.NewAppError("SearchPostsInTeam", "store.sql_post.search.disabled", nil, fmt.Sprintf("teamId=%v userId=%v", teamId, userId), http.StatusNotImplemented)
+ }
+
channels := []store.StoreChannel{}
for _, params := range paramsList {
@@ -682,7 +683,7 @@ func GetOpenGraphMetadata(url string) *opengraph.OpenGraph {
l4g.Error("GetOpenGraphMetadata request failed for url=%v with err=%v", url, err.Error())
return og
}
- defer CloseBody(res)
+ defer consumeAndClose(res)
if err := og.ProcessHTML(res.Body); err != nil {
l4g.Error("GetOpenGraphMetadata processing failed for url=%v with err=%v", url, err.Error())
@@ -718,7 +719,7 @@ func (a *App) DoPostAction(postId string, actionId string, userId string) *model
if err != nil {
return model.NewAppError("DoPostAction", "api.post.do_action.action_integration.app_error", nil, "err="+err.Error(), http.StatusBadRequest)
}
- defer resp.Body.Close()
+ defer consumeAndClose(resp)
if resp.StatusCode != http.StatusOK {
return model.NewAppError("DoPostAction", "api.post.do_action.action_integration.app_error", nil, fmt.Sprintf("status=%v", resp.StatusCode), http.StatusBadRequest)
diff --git a/app/security_update_check.go b/app/security_update_check.go
index 773556f5e..32d1f4d31 100644
--- a/app/security_update_check.go
+++ b/app/security_update_check.go
@@ -80,8 +80,7 @@ func (a *App) DoSecurityUpdateCheck() {
}
bulletins := model.SecurityBulletinsFromJson(res.Body)
- ioutil.ReadAll(res.Body)
- res.Body.Close()
+ consumeAndClose(res)
for _, bulletin := range bulletins {
if bulletin.AppliesToVersion == model.CurrentVersion {
diff --git a/app/server.go b/app/server.go
index 5f955dd65..d686c1f24 100644
--- a/app/server.go
+++ b/app/server.go
@@ -4,7 +4,10 @@
package app
import (
+ "context"
"crypto/tls"
+ "io"
+ "io/ioutil"
"net"
"net/http"
"strings"
@@ -14,13 +17,11 @@ import (
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/rsc/letsencrypt"
- "github.com/tylerb/graceful"
"gopkg.in/throttled/throttled.v2"
"gopkg.in/throttled/throttled.v2/store/memstore"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
- "github.com/mattermost/mattermost-server/store/sqlstore"
"github.com/mattermost/mattermost-server/utils"
)
@@ -28,7 +29,10 @@ type Server struct {
Store store.Store
WebSocketRouter *WebSocketRouter
Router *mux.Router
- GracefulServer *graceful.Server
+ Server *http.Server
+ ListenAddr *net.TCPAddr
+
+ didFinishListen chan struct{}
}
var allowedMethods []string = []string{
@@ -78,16 +82,6 @@ func (cw *CorsWrapper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
const TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN = time.Second
-func (a *App) NewServer() {
- l4g.Info(utils.T("api.server.new_server.init.info"))
-
- a.Srv = &Server{}
-}
-
-func (a *App) InitStores() {
- a.Srv.Store = store.NewLayeredStore(sqlstore.NewSqlSupplier(a.Metrics), a.Metrics, a.Cluster)
-}
-
type VaryBy struct{}
func (m *VaryBy) Key(r *http.Request) string {
@@ -161,30 +155,45 @@ func (a *App) StartServer() {
handler = httpRateLimiter.RateLimit(handler)
}
- a.Srv.GracefulServer = &graceful.Server{
- Timeout: TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN,
- Server: &http.Server{
- Addr: *utils.Cfg.ServiceSettings.ListenAddress,
- Handler: handlers.RecoveryHandler(handlers.RecoveryLogger(&RecoveryLogger{}), handlers.PrintRecoveryStack(true))(handler),
- ReadTimeout: time.Duration(*utils.Cfg.ServiceSettings.ReadTimeout) * time.Second,
- WriteTimeout: time.Duration(*utils.Cfg.ServiceSettings.WriteTimeout) * time.Second,
- },
+ a.Srv.Server = &http.Server{
+ Handler: handlers.RecoveryHandler(handlers.RecoveryLogger(&RecoveryLogger{}), handlers.PrintRecoveryStack(true))(handler),
+ ReadTimeout: time.Duration(*utils.Cfg.ServiceSettings.ReadTimeout) * time.Second,
+ WriteTimeout: time.Duration(*utils.Cfg.ServiceSettings.WriteTimeout) * time.Second,
+ }
+
+ addr := *a.Config().ServiceSettings.ListenAddress
+ if addr == "" {
+ if *utils.Cfg.ServiceSettings.ConnectionSecurity == model.CONN_SECURITY_TLS {
+ addr = ":https"
+ } else {
+ addr = ":http"
+ }
+ }
+
+ listener, err := net.Listen("tcp", addr)
+ if err != nil {
+ l4g.Critical(utils.T("api.server.start_server.starting.critical"), err)
+ return
}
- l4g.Info(utils.T("api.server.start_server.listening.info"), *utils.Cfg.ServiceSettings.ListenAddress)
+ a.Srv.ListenAddr = listener.Addr().(*net.TCPAddr)
+
+ l4g.Info(utils.T("api.server.start_server.listening.info"), listener.Addr().String())
if *utils.Cfg.ServiceSettings.Forward80To443 {
go func() {
- listener, err := net.Listen("tcp", ":80")
+ redirectListener, err := net.Listen("tcp", ":80")
if err != nil {
+ listener.Close()
l4g.Error("Unable to setup forwarding")
return
}
- defer listener.Close()
+ defer redirectListener.Close()
- http.Serve(listener, http.HandlerFunc(redirectHTTPToHTTPS))
+ http.Serve(redirectListener, http.HandlerFunc(redirectHTTPToHTTPS))
}()
}
+ a.Srv.didFinishListen = make(chan struct{})
go func() {
var err error
if *utils.Cfg.ServiceSettings.ConnectionSecurity == model.CONN_SECURITY_TLS {
@@ -198,16 +207,73 @@ func (a *App) StartServer() {
tlsConfig.NextProtos = append(tlsConfig.NextProtos, "h2")
- err = a.Srv.GracefulServer.ListenAndServeTLSConfig(tlsConfig)
+ a.Srv.Server.TLSConfig = tlsConfig
+ err = a.Srv.Server.ServeTLS(listener, "", "")
} else {
- err = a.Srv.GracefulServer.ListenAndServeTLS(*utils.Cfg.ServiceSettings.TLSCertFile, *utils.Cfg.ServiceSettings.TLSKeyFile)
+ err = a.Srv.Server.ServeTLS(listener, *utils.Cfg.ServiceSettings.TLSCertFile, *utils.Cfg.ServiceSettings.TLSKeyFile)
}
} else {
- err = a.Srv.GracefulServer.ListenAndServe()
+ err = a.Srv.Server.Serve(listener)
}
- if err != nil {
+ if err != nil && err != http.ErrServerClosed {
l4g.Critical(utils.T("api.server.start_server.starting.critical"), err)
time.Sleep(time.Second)
}
+ close(a.Srv.didFinishListen)
}()
}
+
+type tcpKeepAliveListener struct {
+ *net.TCPListener
+}
+
+func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
+ tc, err := ln.AcceptTCP()
+ if err != nil {
+ return
+ }
+ tc.SetKeepAlive(true)
+ tc.SetKeepAlivePeriod(3 * time.Minute)
+ return tc, nil
+}
+
+func (a *App) Listen(addr string) (net.Listener, error) {
+ if addr == "" {
+ addr = ":http"
+ }
+ ln, err := net.Listen("tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+ return tcpKeepAliveListener{ln.(*net.TCPListener)}, nil
+}
+
+func (a *App) StopServer() {
+ if a.Srv.Server != nil {
+ ctx, cancel := context.WithTimeout(context.Background(), TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN)
+ defer cancel()
+ didShutdown := false
+ for a.Srv.didFinishListen != nil && !didShutdown {
+ if err := a.Srv.Server.Shutdown(ctx); err != nil {
+ l4g.Warn(err.Error())
+ }
+ timer := time.NewTimer(time.Millisecond * 50)
+ select {
+ case <-a.Srv.didFinishListen:
+ didShutdown = true
+ case <-timer.C:
+ }
+ timer.Stop()
+ }
+ a.Srv.Server.Close()
+ a.Srv.Server = nil
+ }
+}
+
+// This is required to re-use the underlying connection and not take up file descriptors
+func consumeAndClose(r *http.Response) {
+ if r.Body != nil {
+ io.Copy(ioutil.Discard, r.Body)
+ r.Body.Close()
+ }
+}
diff --git a/app/user.go b/app/user.go
index b98583f80..edb4961fc 100644
--- a/app/user.go
+++ b/app/user.go
@@ -438,7 +438,7 @@ func (a *App) GetUsersPage(page int, perPage int, asAdmin bool) ([]*model.User,
}
func (a *App) GetUsersEtag() string {
- return (<-a.Srv.Store.User().GetEtagForAllProfiles()).Data.(string)
+ return fmt.Sprintf("%v.%v.%v", (<-a.Srv.Store.User().GetEtagForAllProfiles()).Data.(string), utils.Cfg.PrivacySettings.ShowFullName, utils.Cfg.PrivacySettings.ShowEmailAddress)
}
func (a *App) GetUsersInTeam(teamId string, offset int, limit int) ([]*model.User, *model.AppError) {
@@ -492,11 +492,11 @@ func (a *App) GetUsersNotInTeamPage(teamId string, page int, perPage int, asAdmi
}
func (a *App) GetUsersInTeamEtag(teamId string) string {
- return (<-a.Srv.Store.User().GetEtagForProfiles(teamId)).Data.(string)
+ return fmt.Sprintf("%v.%v.%v", (<-a.Srv.Store.User().GetEtagForProfiles(teamId)).Data.(string), utils.Cfg.PrivacySettings.ShowFullName, utils.Cfg.PrivacySettings.ShowEmailAddress)
}
func (a *App) GetUsersNotInTeamEtag(teamId string) string {
- return (<-a.Srv.Store.User().GetEtagForProfilesNotInTeam(teamId)).Data.(string)
+ return fmt.Sprintf("%v.%v.%v", (<-a.Srv.Store.User().GetEtagForProfilesNotInTeam(teamId)).Data.(string), utils.Cfg.PrivacySettings.ShowFullName, utils.Cfg.PrivacySettings.ShowEmailAddress)
}
func (a *App) GetUsersInChannel(channelId string, offset int, limit int) ([]*model.User, *model.AppError) {
diff --git a/app/web_conn.go b/app/web_conn.go
index 92b54723a..1c74e65a5 100644
--- a/app/web_conn.go
+++ b/app/web_conn.go
@@ -59,7 +59,7 @@ func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.Tra
UserId: session.UserId,
T: t,
Locale: locale,
- endWritePump: make(chan struct{}, 1),
+ endWritePump: make(chan struct{}, 2),
pumpFinished: make(chan struct{}, 1),
}
@@ -111,13 +111,14 @@ func (c *WebConn) Pump() {
ch <- struct{}{}
}()
c.readPump()
+ c.endWritePump <- struct{}{}
<-ch
+ c.App.HubUnregister(c)
c.pumpFinished <- struct{}{}
}
func (c *WebConn) readPump() {
defer func() {
- c.App.HubUnregister(c)
c.WebSocket.Close()
}()
c.WebSocket.SetReadLimit(model.SOCKET_MAX_MESSAGE_SIZE_KB)
diff --git a/app/webhook.go b/app/webhook.go
index 1530ba94a..231fe1529 100644
--- a/app/webhook.go
+++ b/app/webhook.go
@@ -109,7 +109,7 @@ func (a *App) TriggerWebhook(payload *model.OutgoingWebhookPayload, hook *model.
if resp, err := utils.HttpClient(false).Do(req); err != nil {
l4g.Error(utils.T("api.post.handle_webhook_events_and_forget.event_post.error"), err.Error())
} else {
- defer CloseBody(resp)
+ defer consumeAndClose(resp)
webhookResp := model.OutgoingWebhookResponseFromJson(resp.Body)
if webhookResp != nil && webhookResp.Text != nil {
diff --git a/app/webrtc.go b/app/webrtc.go
index d2bfffbe0..d8684f1fd 100644
--- a/app/webrtc.go
+++ b/app/webrtc.go
@@ -23,8 +23,9 @@ func GetWebrtcInfoForSession(sessionId string) (*model.WebrtcInfoResponse, *mode
}
result := &model.WebrtcInfoResponse{
- Token: token,
- GatewayUrl: *utils.Cfg.WebrtcSettings.GatewayWebsocketUrl,
+ Token: token,
+ GatewayUrl: *utils.Cfg.WebrtcSettings.GatewayWebsocketUrl,
+ GatewayType: *utils.Cfg.WebrtcSettings.GatewayType,
}
if len(*utils.Cfg.WebrtcSettings.StunURI) > 0 {
@@ -48,6 +49,16 @@ func GetWebrtcToken(sessionId string) (string, *model.AppError) {
return "", model.NewAppError("WebRTC.getWebrtcToken", "api.webrtc.disabled.app_error", nil, "", http.StatusNotImplemented)
}
+ switch strings.ToLower(*utils.Cfg.WebrtcSettings.GatewayType) {
+ case "kopano-webmeetings":
+ return GetKopanoWebmeetingsWebrtcToken(sessionId)
+ default:
+ // Default to Janus.
+ return GetJanusWebrtcToken(sessionId)
+ }
+}
+
+func GetJanusWebrtcToken(sessionId string) (string, *model.AppError) {
token := base64.StdEncoding.EncodeToString([]byte(sessionId))
data := make(map[string]string)
@@ -62,10 +73,10 @@ func GetWebrtcToken(sessionId string) (string, *model.AppError) {
if rp, err := utils.HttpClient(true).Do(rq); err != nil {
return "", model.NewAppError("WebRTC.Token", "model.client.connecting.app_error", nil, err.Error(), http.StatusInternalServerError)
} else if rp.StatusCode >= 300 {
- defer CloseBody(rp)
+ defer consumeAndClose(rp)
return "", model.AppErrorFromJson(rp.Body)
} else {
- janusResponse := model.GatewayResponseFromJson(rp.Body)
+ janusResponse := model.JanusGatewayResponseFromJson(rp.Body)
if janusResponse.Status != "success" {
return "", model.NewAppError("getWebrtcToken", "api.webrtc.register_token.app_error", nil, "", http.StatusInternalServerError)
}
@@ -74,6 +85,29 @@ func GetWebrtcToken(sessionId string) (string, *model.AppError) {
return token, nil
}
+func GetKopanoWebmeetingsWebrtcToken(sessionId string) (string, *model.AppError) {
+ data := make(map[string]string)
+ data["type"] = "Token"
+ data["id"] = sessionId
+
+ rq, _ := http.NewRequest("POST", *utils.Cfg.WebrtcSettings.GatewayAdminUrl+"/auth/tokens", strings.NewReader(model.MapToJson(data)))
+ rq.Header.Set("Content-Type", "application/json")
+ rq.Header.Set("Authorization", "Bearer "+*utils.Cfg.WebrtcSettings.GatewayAdminSecret)
+
+ if rp, err := utils.HttpClient(true).Do(rq); err != nil {
+ return "", model.NewAppError("WebRTC.Token", "model.client.connecting.app_error", nil, err.Error(), http.StatusInternalServerError)
+ } else if rp.StatusCode >= 300 {
+ defer consumeAndClose(rp)
+ return "", model.AppErrorFromJson(rp.Body)
+ } else {
+ kwmResponse := model.KopanoWebmeetingsResponseFromJson(rp.Body)
+ if kwmResponse.Value == "" {
+ return "", model.NewAppError("getWebrtcToken", "api.webrtc.register_token.app_error", nil, "", http.StatusInternalServerError)
+ }
+ return kwmResponse.Value, nil
+ }
+}
+
func GenerateTurnPassword(username string, secret string) string {
key := []byte(secret)
h := hmac.New(sha1.New, key)
diff --git a/app/websocket_router.go b/app/websocket_router.go
index cad53ade7..6bc3a6ff7 100644
--- a/app/websocket_router.go
+++ b/app/websocket_router.go
@@ -21,13 +21,6 @@ type WebSocketRouter struct {
handlers map[string]webSocketHandler
}
-func (a *App) NewWebSocketRouter() *WebSocketRouter {
- return &WebSocketRouter{
- app: a,
- handlers: make(map[string]webSocketHandler),
- }
-}
-
func (wr *WebSocketRouter) Handle(action string, handler webSocketHandler) {
wr.handlers[action] = handler
}