From 2114e96f0045376f41d5c318f7cf45b6b50141dc Mon Sep 17 00:00:00 2001 From: Joram Wilander Date: Wed, 27 Jun 2018 17:19:06 -0400 Subject: Reset status to away after web conn disconnect if necessary (#8988) --- app/session.go | 3 +++ app/status.go | 16 ++++++++++++++++ app/web_conn.go | 18 ++++++++++-------- app/web_hub.go | 40 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/app/session.go b/app/session.go index 53170cec0..5289aefaa 100644 --- a/app/session.go +++ b/app/session.go @@ -227,6 +227,9 @@ func (a *App) AttachDeviceId(sessionId string, deviceId string, expiresAt int64) func (a *App) UpdateLastActivityAtIfNeeded(session model.Session) { now := model.GetMillis() + + a.UpdateWebConnUserActivity(session, now) + if now-session.LastActivityAt < model.SESSION_ACTIVITY_TIMEOUT { return } diff --git a/app/status.go b/app/status.go index e2367a396..460cbbbd0 100644 --- a/app/status.go +++ b/app/status.go @@ -161,6 +161,22 @@ func (a *App) GetUserStatusesByIds(userIds []string) ([]*model.Status, *model.Ap return statusMap, nil } +// SetStatusLastActivityAt sets the last activity at for a user on the local app server and updates +// status to away if needed. Used by the WS to set status to away if an 'online' device disconnects +// while an 'away' device is still connected +func (a *App) SetStatusLastActivityAt(userId string, activityAt int64) { + var status *model.Status + var err *model.AppError + if status, err = a.GetStatus(userId); err != nil { + return + } + + status.LastActivityAt = activityAt + + a.AddStatusCacheSkipClusterSend(status) + a.SetStatusAwayIfNeeded(userId, false) +} + func (a *App) SetStatusOnline(userId string, sessionId string, manual bool) { if !*a.Config().ServiceSettings.EnableUserStatuses { return diff --git a/app/web_conn.go b/app/web_conn.go index 9d8134f34..dd01a8e31 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -33,6 +33,7 @@ type WebConn struct { Send chan model.WebSocketMessage sessionToken atomic.Value session atomic.Value + LastUserActivityAt int64 UserId string T goi18n.TranslateFunc Locale string @@ -52,14 +53,15 @@ func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.Tra } wc := &WebConn{ - App: a, - Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE), - WebSocket: ws, - UserId: session.UserId, - T: t, - Locale: locale, - endWritePump: make(chan struct{}, 2), - pumpFinished: make(chan struct{}, 1), + App: a, + Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE), + WebSocket: ws, + LastUserActivityAt: model.GetMillis(), + UserId: session.UserId, + T: t, + Locale: locale, + endWritePump: make(chan struct{}, 2), + pumpFinished: make(chan struct{}, 1), } wc.SetSession(&session) diff --git a/app/web_hub.go b/app/web_hub.go index 2ce78b5ef..5bb86ee38 100644 --- a/app/web_hub.go +++ b/app/web_hub.go @@ -23,6 +23,12 @@ const ( DEADLOCK_WARN = (BROADCAST_QUEUE_SIZE * 99) / 100 // number of buffered messages before printing stack trace ) +type WebConnActivityMessage struct { + UserId string + SessionToken string + ActivityAt int64 +} + type Hub struct { // connectionCount should be kept first. // See https://github.com/mattermost/mattermost-server/pull/7281 @@ -35,6 +41,7 @@ type Hub struct { stop chan struct{} didStop chan struct{} invalidateUser chan string + activity chan *WebConnActivityMessage ExplicitStop bool goroutineId int } @@ -48,6 +55,7 @@ func (a *App) NewWebHub() *Hub { stop: make(chan struct{}), didStop: make(chan struct{}), invalidateUser: make(chan string), + activity: make(chan *WebConnActivityMessage), ExplicitStop: false, } } @@ -330,6 +338,13 @@ func (a *App) InvalidateWebConnSessionCacheForUser(userId string) { } } +func (a *App) UpdateWebConnUserActivity(session model.Session, activityAt int64) { + hub := a.GetHubForUserId(session.UserId) + if hub != nil { + hub.UpdateActivity(session.UserId, session.Token, activityAt) + } +} + func (h *Hub) Register(webConn *WebConn) { h.register <- webConn @@ -355,6 +370,10 @@ func (h *Hub) InvalidateUser(userId string) { h.invalidateUser <- userId } +func (h *Hub) UpdateActivity(userId, sessionToken string, activityAt int64) { + h.activity <- &WebConnActivityMessage{UserId: userId, SessionToken: sessionToken, ActivityAt: activityAt} +} + func getGoroutineId() int { var buf [64]byte n := runtime.Stack(buf[:], false) @@ -395,15 +414,34 @@ func (h *Hub) Start() { continue } - if len(connections.ForUser(webCon.UserId)) == 0 { + conns := connections.ForUser(webCon.UserId) + if len(conns) == 0 { h.app.Go(func() { h.app.SetStatusOffline(webCon.UserId, false) }) + } else { + var latestActivity int64 = 0 + for _, conn := range conns { + if conn.LastUserActivityAt > latestActivity { + latestActivity = conn.LastUserActivityAt + } + } + if h.app.IsUserAway(latestActivity) { + h.app.Go(func() { + h.app.SetStatusLastActivityAt(webCon.UserId, latestActivity) + }) + } } case userId := <-h.invalidateUser: for _, webCon := range connections.ForUser(userId) { webCon.InvalidateCache() } + case activity := <-h.activity: + for _, webCon := range connections.ForUser(activity.UserId) { + if webCon.GetSessionToken() == activity.SessionToken { + webCon.LastUserActivityAt = activity.ActivityAt + } + } case msg := <-h.broadcast: candidates := connections.All() if msg.Broadcast.UserId != "" { -- cgit v1.2.3-1-g7c22