From 07777f5ff9e0bde26abd0288164e5f73b6da992a Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 4 Oct 2017 13:09:41 -0700 Subject: Fix races / finally remove global app for good (#7570) * finally remove global app for good * test compilation fixes * fix races * fix deadlock * wake up write pump so it doesn't take forever to clean up --- app/web_conn.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) (limited to 'app/web_conn.go') diff --git a/app/web_conn.go b/app/web_conn.go index 5f66d9a51..92b54723a 100644 --- a/app/web_conn.go +++ b/app/web_conn.go @@ -40,6 +40,8 @@ type WebConn struct { AllChannelMembers map[string]string LastAllChannelMembersTime int64 Sequence int64 + endWritePump chan struct{} + pumpFinished chan struct{} } func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn { @@ -51,12 +53,14 @@ func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.Tra } wc := &WebConn{ - App: a, - Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE), - WebSocket: ws, - UserId: session.UserId, - T: t, - Locale: locale, + App: a, + Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE), + WebSocket: ws, + UserId: session.UserId, + T: t, + Locale: locale, + endWritePump: make(chan struct{}, 1), + pumpFinished: make(chan struct{}, 1), } wc.SetSession(&session) @@ -66,6 +70,12 @@ func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.Tra return wc } +func (wc *WebConn) Close() { + wc.WebSocket.Close() + wc.endWritePump <- struct{}{} + <-wc.pumpFinished +} + func (c *WebConn) GetSessionExpiresAt() int64 { return atomic.LoadInt64(&c.sessionExpiresAt) } @@ -97,14 +107,15 @@ func (c *WebConn) SetSession(v *model.Session) { func (c *WebConn) Pump() { ch := make(chan struct{}, 1) go func() { - c.WritePump() + c.writePump() ch <- struct{}{} }() - c.ReadPump() + c.readPump() <-ch + c.pumpFinished <- struct{}{} } -func (c *WebConn) ReadPump() { +func (c *WebConn) readPump() { defer func() { c.App.HubUnregister(c) c.WebSocket.Close() @@ -138,7 +149,7 @@ func (c *WebConn) ReadPump() { } } -func (c *WebConn) WritePump() { +func (c *WebConn) writePump() { ticker := time.NewTicker(PING_PERIOD) authTicker := time.NewTicker(AUTH_TIMEOUT) @@ -221,7 +232,8 @@ func (c *WebConn) WritePump() { return } - + case <-c.endWritePump: + return case <-authTicker.C: if c.GetSessionToken() == "" { l4g.Debug(fmt.Sprintf("websocket.authTicker: did not authenticate ip=%v", c.WebSocket.RemoteAddr())) -- cgit v1.2.3-1-g7c22