diff options
Diffstat (limited to 'api')
-rw-r--r-- | api/api.go | 1 | ||||
-rw-r--r-- | api/context.go | 7 | ||||
-rw-r--r-- | api/general.go | 11 | ||||
-rw-r--r-- | api/post.go | 8 | ||||
-rw-r--r-- | api/status.go | 155 | ||||
-rw-r--r-- | api/status_test.go | 153 | ||||
-rw-r--r-- | api/user.go | 38 | ||||
-rw-r--r-- | api/user_test.go | 60 | ||||
-rw-r--r-- | api/web_conn.go | 23 | ||||
-rw-r--r-- | api/web_hub.go | 13 | ||||
-rw-r--r-- | api/websocket_handler.go | 9 |
11 files changed, 351 insertions, 127 deletions
diff --git a/api/api.go b/api/api.go index 4cc11168c..9e73bd125 100644 --- a/api/api.go +++ b/api/api.go @@ -94,6 +94,7 @@ func InitApi() { InitPreference() InitLicense() InitEmoji() + InitStatus() // 404 on any api route before web.go has a chance to serve it Srv.Router.Handle("/api/{anything:.*}", http.HandlerFunc(Handle404)) diff --git a/api/context.go b/api/context.go index 93ff83247..2132ce0e7 100644 --- a/api/context.go +++ b/api/context.go @@ -20,6 +20,7 @@ import ( ) var sessionCache *utils.Cache = utils.NewLru(model.SESSION_CACHE_SIZE) +var statusCache *utils.Cache = utils.NewLru(model.STATUS_CACHE_SIZE) var allowedMethods []string = []string{ "POST", @@ -196,11 +197,7 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if c.Err == nil && h.isUserActivity && token != "" && len(c.Session.UserId) > 0 { - go func() { - if err := (<-Srv.Store.User().UpdateUserAndSessionActivity(c.Session.UserId, c.Session.Id, model.GetMillis())).Err; err != nil { - l4g.Error(utils.T("api.context.last_activity_at.error"), c.Session.UserId, c.Session.Id, err) - } - }() + SetStatusOnline(c.Session.UserId, c.Session.Id) } if c.Err == nil { diff --git a/api/general.go b/api/general.go index 4124d2e95..233484e43 100644 --- a/api/general.go +++ b/api/general.go @@ -73,7 +73,12 @@ func ping(c *Context, w http.ResponseWriter, r *http.Request) { w.Write([]byte(model.MapToJson(m))) } -func webSocketPing(req *model.WebSocketRequest, responseData map[string]interface{}) *model.AppError { - responseData["text"] = "pong" - return nil +func webSocketPing(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { + data := map[string]interface{}{} + data["text"] = "pong" + data["version"] = model.CurrentVersion + data["server_time"] = model.GetMillis() + data["node_id"] = "" + + return data, nil } diff --git a/api/post.go b/api/post.go index 60ac11a2b..4533823f6 100644 --- a/api/post.go +++ b/api/post.go @@ -597,7 +597,13 @@ func sendNotifications(c *Context, post *model.Post, team *model.Team, channel * for _, id := range mentionedUsersList { userAllowsEmails := profileMap[id].NotifyProps["email"] != "false" - if userAllowsEmails && (profileMap[id].IsAway() || profileMap[id].IsOffline()) { + var status *model.Status + var err *model.AppError + if status, err = GetStatus(id); err != nil { + status = &model.Status{id, model.STATUS_OFFLINE, 0} + } + + if userAllowsEmails && status.Status != model.STATUS_ONLINE { sendNotificationEmail(c, post, profileMap[id], channel, team, senderName) } } diff --git a/api/status.go b/api/status.go new file mode 100644 index 000000000..88f024f4e --- /dev/null +++ b/api/status.go @@ -0,0 +1,155 @@ +// Copyright (c) 2016 Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package api + +import ( + "net/http" + + l4g "github.com/alecthomas/log4go" + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" + "github.com/mattermost/platform/utils" +) + +func InitStatus() { + l4g.Debug(utils.T("api.status.init.debug")) + + BaseRoutes.Users.Handle("/status", ApiUserRequiredActivity(getStatusesHttp, false)).Methods("GET") + BaseRoutes.WebSocket.Handle("get_statuses", ApiWebSocketHandler(getStatusesWebSocket)) +} + +func getStatusesHttp(c *Context, w http.ResponseWriter, r *http.Request) { + statusMap, err := GetAllStatuses() + if err != nil { + c.Err = err + return + } + + w.Write([]byte(model.StringInterfaceToJson(statusMap))) +} + +func getStatusesWebSocket(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { + statusMap, err := GetAllStatuses() + if err != nil { + return nil, err + } + + return statusMap, nil +} + +func GetAllStatuses() (map[string]interface{}, *model.AppError) { + if result := <-Srv.Store.Status().GetOnlineAway(); result.Err != nil { + return nil, result.Err + } else { + statuses := result.Data.([]*model.Status) + + statusMap := map[string]interface{}{} + for _, s := range statuses { + statusMap[s.UserId] = s.Status + } + + return statusMap, nil + } +} + +func SetStatusOnline(userId string, sessionId string) { + broadcast := false + saveStatus := false + + var status *model.Status + var err *model.AppError + if status, err = GetStatus(userId); err != nil { + status = &model.Status{userId, model.STATUS_ONLINE, model.GetMillis()} + broadcast = true + saveStatus = true + } else { + if status.Status != model.STATUS_ONLINE { + broadcast = true + } + status.Status = model.STATUS_ONLINE + status.LastActivityAt = model.GetMillis() + } + + statusCache.Add(status.UserId, status) + + achan := Srv.Store.Session().UpdateLastActivityAt(sessionId, model.GetMillis()) + + var schan store.StoreChannel + if saveStatus { + schan = Srv.Store.Status().SaveOrUpdate(status) + } else { + schan = Srv.Store.Status().UpdateLastActivityAt(status.UserId, status.LastActivityAt) + } + + if result := <-achan; result.Err != nil { + l4g.Error(utils.T("api.status.last_activity.error"), userId, sessionId, result.Err) + } + + if result := <-schan; result.Err != nil { + l4g.Error(utils.T("api.status.save_status.error"), userId, result.Err) + } + + if broadcast { + event := model.NewWebSocketEvent("", "", status.UserId, model.WEBSOCKET_EVENT_STATUS_CHANGE) + event.Add("status", model.STATUS_ONLINE) + go Publish(event) + } +} + +func SetStatusOffline(userId string) { + status := &model.Status{userId, model.STATUS_OFFLINE, model.GetMillis()} + + statusCache.Add(status.UserId, status) + + if result := <-Srv.Store.Status().SaveOrUpdate(status); result.Err != nil { + l4g.Error(utils.T("api.status.save_status.error"), userId, result.Err) + } + + event := model.NewWebSocketEvent("", "", status.UserId, model.WEBSOCKET_EVENT_STATUS_CHANGE) + event.Add("status", model.STATUS_OFFLINE) + go Publish(event) +} + +func SetStatusAwayIfNeeded(userId string) { + status, err := GetStatus(userId) + if err != nil { + status = &model.Status{userId, model.STATUS_OFFLINE, 0} + } + + if status.Status == model.STATUS_AWAY { + return + } + + if !IsUserAway(status.LastActivityAt) { + return + } + + status.Status = model.STATUS_AWAY + + statusCache.Add(status.UserId, status) + + if result := <-Srv.Store.Status().SaveOrUpdate(status); result.Err != nil { + l4g.Error(utils.T("api.status.save_status.error"), userId, result.Err) + } + + event := model.NewWebSocketEvent("", "", status.UserId, model.WEBSOCKET_EVENT_STATUS_CHANGE) + event.Add("status", model.STATUS_AWAY) + go Publish(event) +} + +func GetStatus(userId string) (*model.Status, *model.AppError) { + if status, ok := statusCache.Get(userId); ok { + return status.(*model.Status), nil + } + + if result := <-Srv.Store.Status().Get(userId); result.Err != nil { + return nil, result.Err + } else { + return result.Data.(*model.Status), nil + } +} + +func IsUserAway(lastActivityAt int64) bool { + return model.GetMillis()-lastActivityAt >= *utils.Cfg.TeamSettings.UserStatusAwayTimeout*1000 +} diff --git a/api/status_test.go b/api/status_test.go new file mode 100644 index 000000000..a035cf8bf --- /dev/null +++ b/api/status_test.go @@ -0,0 +1,153 @@ +// Copyright (c) 2016 Mattermost, Inc. All Rights Reserved. +// See License.txt for license information. + +package api + +import ( + "strings" + "testing" + "time" + + "github.com/mattermost/platform/model" + "github.com/mattermost/platform/store" + "github.com/mattermost/platform/utils" +) + +func TestStatuses(t *testing.T) { + th := Setup().InitBasic() + Client := th.BasicClient + WebSocketClient, err := th.CreateWebSocketClient() + if err != nil { + t.Fatal(err) + } + defer WebSocketClient.Close() + WebSocketClient.Listen() + + team := model.Team{DisplayName: "Name", Name: "z-z-" + model.NewId() + "a", Email: "test@nowhere.com", Type: model.TEAM_OPEN} + rteam, _ := Client.CreateTeam(&team) + + user := model.User{Email: strings.ToLower(model.NewId()) + "success+test@simulator.amazonses.com", Nickname: "Corey Hulen", Password: "passwd1"} + ruser := Client.Must(Client.CreateUser(&user, "")).Data.(*model.User) + LinkUserToTeam(ruser, rteam.Data.(*model.Team)) + store.Must(Srv.Store.User().VerifyEmail(ruser.Id)) + + user2 := model.User{Email: strings.ToLower(model.NewId()) + "success+test@simulator.amazonses.com", Nickname: "Corey Hulen", Password: "passwd1"} + ruser2 := Client.Must(Client.CreateUser(&user2, "")).Data.(*model.User) + LinkUserToTeam(ruser2, rteam.Data.(*model.Team)) + store.Must(Srv.Store.User().VerifyEmail(ruser2.Id)) + + Client.Login(user.Email, user.Password) + Client.SetTeamId(team.Id) + + r1, err := Client.GetStatuses() + if err != nil { + t.Fatal(err) + } + + statuses := r1.Data.(map[string]string) + + for _, status := range statuses { + if status != model.STATUS_OFFLINE && status != model.STATUS_AWAY && status != model.STATUS_ONLINE { + t.Fatal("one of the statuses had an invalid value") + } + } + + th.LoginBasic2() + + WebSocketClient2, err2 := th.CreateWebSocketClient() + if err2 != nil { + t.Fatal(err2) + } + + time.Sleep(300 * time.Millisecond) + + WebSocketClient.GetStatuses() + if resp := <-WebSocketClient.ResponseChannel; resp.Error != nil { + t.Fatal(resp.Error) + } else { + if resp.SeqReply != WebSocketClient.Sequence-1 { + t.Fatal("bad sequence number") + } + + for _, status := range resp.Data { + if status != model.STATUS_OFFLINE && status != model.STATUS_AWAY && status != model.STATUS_ONLINE { + t.Fatal("one of the statuses had an invalid value") + } + } + + if status, ok := resp.Data[th.BasicUser2.Id]; !ok { + t.Fatal("should have had user status") + } else if status != model.STATUS_ONLINE { + t.Log(status) + t.Fatal("status should have been online") + } + } + + SetStatusAwayIfNeeded(th.BasicUser2.Id) + + awayTimeout := *utils.Cfg.TeamSettings.UserStatusAwayTimeout + defer func() { + *utils.Cfg.TeamSettings.UserStatusAwayTimeout = awayTimeout + }() + *utils.Cfg.TeamSettings.UserStatusAwayTimeout = 1 + + time.Sleep(1500 * time.Millisecond) + + SetStatusAwayIfNeeded(th.BasicUser2.Id) + SetStatusAwayIfNeeded(th.BasicUser2.Id) + + WebSocketClient2.Close() + time.Sleep(300 * time.Millisecond) + + WebSocketClient.GetStatuses() + if resp := <-WebSocketClient.ResponseChannel; resp.Error != nil { + t.Fatal(resp.Error) + } else { + if resp.SeqReply != WebSocketClient.Sequence-1 { + t.Fatal("bad sequence number") + } + + if _, ok := resp.Data[th.BasicUser2.Id]; ok { + t.Fatal("should not have had user status") + } + } + + stop := make(chan bool) + onlineHit := false + awayHit := false + offlineHit := false + + go func() { + for { + select { + case resp := <-WebSocketClient.EventChannel: + if resp.Event == model.WEBSOCKET_EVENT_STATUS_CHANGE && resp.UserId == th.BasicUser2.Id { + status := resp.Data["status"].(string) + if status == model.STATUS_ONLINE { + onlineHit = true + } else if status == model.STATUS_AWAY { + awayHit = true + } else if status == model.STATUS_OFFLINE { + offlineHit = true + } + } + case <-stop: + return + } + } + }() + + time.Sleep(500 * time.Millisecond) + + stop <- true + + if !onlineHit { + t.Fatal("didn't get online event") + } + if !awayHit { + t.Fatal("didn't get away event") + } + if !offlineHit { + t.Fatal("didn't get offline event") + } +} diff --git a/api/user.go b/api/user.go index 7d2eb85bf..652da14ad 100644 --- a/api/user.go +++ b/api/user.go @@ -54,7 +54,6 @@ func InitUser() { BaseRoutes.Users.Handle("/newimage", ApiUserRequired(uploadProfileImage)).Methods("POST") BaseRoutes.Users.Handle("/me", ApiAppHandler(getMe)).Methods("GET") BaseRoutes.Users.Handle("/initial_load", ApiAppHandler(getInitialLoad)).Methods("GET") - BaseRoutes.Users.Handle("/status", ApiUserRequiredActivity(getStatuses, false)).Methods("POST") BaseRoutes.Users.Handle("/direct_profiles", ApiUserRequired(getDirectProfiles)).Methods("GET") BaseRoutes.Users.Handle("/profiles/{id:[A-Za-z0-9]+}", ApiUserRequired(getProfiles)).Methods("GET") BaseRoutes.Users.Handle("/profiles_for_dm_list/{id:[A-Za-z0-9]+}", ApiUserRequired(getProfilesForDirectMessageList)).Methods("GET") @@ -1955,35 +1954,6 @@ func updateUserNotify(c *Context, w http.ResponseWriter, r *http.Request) { } } -func getStatuses(c *Context, w http.ResponseWriter, r *http.Request) { - userIds := model.ArrayFromJson(r.Body) - if len(userIds) == 0 { - c.SetInvalidParam("getStatuses", "userIds") - return - } - - if result := <-Srv.Store.User().GetProfileByIds(userIds); result.Err != nil { - c.Err = result.Err - return - } else { - profiles := result.Data.(map[string]*model.User) - - statuses := map[string]string{} - for _, profile := range profiles { - if profile.IsOffline() { - statuses[profile.Id] = model.USER_OFFLINE - } else if profile.IsAway() { - statuses[profile.Id] = model.USER_AWAY - } else { - statuses[profile.Id] = model.USER_ONLINE - } - } - - w.Write([]byte(model.MapToJson(statuses))) - return - } -} - func IsUsernameTaken(name string) bool { if !model.IsValidUsername(name) { @@ -2312,7 +2282,7 @@ func resendVerification(c *Context, w http.ResponseWriter, r *http.Request) { c.Err = error return } else { - if user.LastActivityAt > 0 { + if _, err := GetStatus(user.Id); err != nil { go SendEmailChangeVerifyEmail(c, user.Id, user.Email, c.GetSiteURL()) } else { go SendVerifyEmail(c, user.Id, user.Email, c.GetSiteURL()) @@ -2551,11 +2521,11 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) { } } -func userTyping(req *model.WebSocketRequest, responseData map[string]interface{}) *model.AppError { +func userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) { var ok bool var channelId string if channelId, ok = req.Data["channel_id"].(string); !ok || len(channelId) != 26 { - return NewInvalidWebSocketParamError(req.Action, "channel_id") + return nil, NewInvalidWebSocketParamError(req.Action, "channel_id") } var parentId string @@ -2567,5 +2537,5 @@ func userTyping(req *model.WebSocketRequest, responseData map[string]interface{} event.Add("parent_id", parentId) go Publish(event) - return nil + return nil, nil } diff --git a/api/user_test.go b/api/user_test.go index fcb2c4f00..6f3f616e7 100644 --- a/api/user_test.go +++ b/api/user_test.go @@ -669,9 +669,7 @@ func TestUserUpdate(t *testing.T) { team := &model.Team{DisplayName: "Name", Name: "z-z-" + model.NewId() + "a", Email: "test@nowhere.com", Type: model.TEAM_OPEN} team = Client.Must(Client.CreateTeam(team)).Data.(*model.Team) - time1 := model.GetMillis() - - user := &model.User{Email: strings.ToLower(model.NewId()) + "success+test@simulator.amazonses.com", Nickname: "Corey Hulen", Password: "passwd1", LastActivityAt: time1, LastPingAt: time1, Roles: ""} + user := &model.User{Email: strings.ToLower(model.NewId()) + "success+test@simulator.amazonses.com", Nickname: "Corey Hulen", Password: "passwd1", Roles: ""} user = Client.Must(Client.CreateUser(user, "")).Data.(*model.User) LinkUserToTeam(user, team) store.Must(Srv.Store.User().VerifyEmail(user.Id)) @@ -683,15 +681,7 @@ func TestUserUpdate(t *testing.T) { Client.Login(user.Email, "passwd1") Client.SetTeamId(team.Id) - time.Sleep(100 * time.Millisecond) - - time2 := model.GetMillis() - - time.Sleep(100 * time.Millisecond) - user.Nickname = "Jim Jimmy" - user.LastActivityAt = time2 - user.LastPingAt = time2 user.Roles = model.ROLE_TEAM_ADMIN user.LastPasswordUpdate = 123 @@ -701,12 +691,6 @@ func TestUserUpdate(t *testing.T) { if result.Data.(*model.User).Nickname != "Jim Jimmy" { t.Fatal("Nickname did not update properly") } - if result.Data.(*model.User).LastActivityAt == time2 { - t.Fatal("LastActivityAt should not have updated") - } - if result.Data.(*model.User).LastPingAt == time2 { - t.Fatal("LastPingAt should not have updated") - } if result.Data.(*model.User).Roles != "" { t.Fatal("Roles should not have updated") } @@ -1347,48 +1331,6 @@ func TestFuzzyUserCreate(t *testing.T) { } } -func TestStatuses(t *testing.T) { - th := Setup() - Client := th.CreateClient() - - team := model.Team{DisplayName: "Name", Name: "z-z-" + model.NewId() + "a", Email: "test@nowhere.com", Type: model.TEAM_OPEN} - rteam, _ := Client.CreateTeam(&team) - - user := model.User{Email: strings.ToLower(model.NewId()) + "success+test@simulator.amazonses.com", Nickname: "Corey Hulen", Password: "passwd1"} - ruser := Client.Must(Client.CreateUser(&user, "")).Data.(*model.User) - LinkUserToTeam(ruser, rteam.Data.(*model.Team)) - store.Must(Srv.Store.User().VerifyEmail(ruser.Id)) - - user2 := model.User{Email: strings.ToLower(model.NewId()) + "success+test@simulator.amazonses.com", Nickname: "Corey Hulen", Password: "passwd1"} - ruser2 := Client.Must(Client.CreateUser(&user2, "")).Data.(*model.User) - LinkUserToTeam(ruser2, rteam.Data.(*model.Team)) - store.Must(Srv.Store.User().VerifyEmail(ruser2.Id)) - - Client.Login(user.Email, user.Password) - Client.SetTeamId(team.Id) - - userIds := []string{ruser2.Id} - - r1, err := Client.GetStatuses(userIds) - if err != nil { - t.Fatal(err) - } - - statuses := r1.Data.(map[string]string) - - if len(statuses) != 1 { - t.Log(statuses) - t.Fatal("invalid number of statuses") - } - - for _, status := range statuses { - if status != model.USER_OFFLINE && status != model.USER_AWAY && status != model.USER_ONLINE { - t.Fatal("one of the statuses had an invalid value") - } - } - -} - func TestEmailToOAuth(t *testing.T) { th := Setup() Client := th.CreateClient() diff --git a/api/web_conn.go b/api/web_conn.go index 3f4414c5e..8741873fd 100644 --- a/api/web_conn.go +++ b/api/web_conn.go @@ -7,9 +7,7 @@ import ( "time" "github.com/mattermost/platform/model" - "github.com/mattermost/platform/utils" - l4g "github.com/alecthomas/log4go" "github.com/gorilla/websocket" goi18n "github.com/nicksnyder/go-i18n/i18n" ) @@ -34,18 +32,7 @@ type WebConn struct { } func NewWebConn(c *Context, ws *websocket.Conn) *WebConn { - go func() { - achan := Srv.Store.User().UpdateUserAndSessionActivity(c.Session.UserId, c.Session.Token, model.GetMillis()) - pchan := Srv.Store.User().UpdateLastPingAt(c.Session.UserId, model.GetMillis()) - - if result := <-achan; result.Err != nil { - l4g.Error(utils.T("api.web_conn.new_web_conn.last_activity.error"), c.Session.UserId, c.Session.Token, result.Err) - } - - if result := <-pchan; result.Err != nil { - l4g.Error(utils.T("api.web_conn.new_web_conn.last_ping.error"), c.Session.UserId, result.Err) - } - }() + go SetStatusOnline(c.Session.UserId, c.Session.Id) return &WebConn{ Send: make(chan model.WebSocketMessage, 64), @@ -68,13 +55,7 @@ func (c *WebConn) readPump() { c.WebSocket.SetReadDeadline(time.Now().Add(PONG_WAIT)) c.WebSocket.SetPongHandler(func(string) error { c.WebSocket.SetReadDeadline(time.Now().Add(PONG_WAIT)) - - go func() { - if result := <-Srv.Store.User().UpdateLastPingAt(c.UserId, model.GetMillis()); result.Err != nil { - l4g.Error(utils.T("api.web_conn.new_web_conn.last_ping.error"), c.UserId, result.Err) - } - }() - + go SetStatusAwayIfNeeded(c.UserId) return nil }) diff --git a/api/web_hub.go b/api/web_hub.go index db0f31bb7..455189f70 100644 --- a/api/web_hub.go +++ b/api/web_hub.go @@ -67,10 +67,23 @@ func (h *Hub) Start() { h.connections[webCon] = true case webCon := <-h.unregister: + userId := webCon.UserId if _, ok := h.connections[webCon]; ok { delete(h.connections, webCon) close(webCon.Send) } + + found := false + for webCon := range h.connections { + if userId == webCon.UserId { + found = true + break + } + } + + if !found { + go SetStatusOffline(userId) + } case userId := <-h.invalidateUser: for webCon := range h.connections { if webCon.UserId == userId { diff --git a/api/websocket_handler.go b/api/websocket_handler.go index 8abec6715..5a313fe13 100644 --- a/api/websocket_handler.go +++ b/api/websocket_handler.go @@ -10,12 +10,12 @@ import ( "github.com/mattermost/platform/utils" ) -func ApiWebSocketHandler(wh func(*model.WebSocketRequest, map[string]interface{}) *model.AppError) *webSocketHandler { +func ApiWebSocketHandler(wh func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)) *webSocketHandler { return &webSocketHandler{wh} } type webSocketHandler struct { - handlerFunc func(*model.WebSocketRequest, map[string]interface{}) *model.AppError + handlerFunc func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError) } func (wh *webSocketHandler) ServeWebSocket(conn *WebConn, r *model.WebSocketRequest) { @@ -25,9 +25,10 @@ func (wh *webSocketHandler) ServeWebSocket(conn *WebConn, r *model.WebSocketRequ r.T = conn.T r.Locale = conn.Locale - data := make(map[string]interface{}) + var data map[string]interface{} + var err *model.AppError - if err := wh.handlerFunc(r, data); err != nil { + if data, err = wh.handlerFunc(r); err != nil { l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, r.Session.UserId, err.SystemMessage(utils.T), err.DetailedError) err.DetailedError = "" conn.Send <- model.NewWebSocketError(r.Seq, err) |