diff options
author | JoramWilander <jwawilander@gmail.com> | 2018-07-10 15:01:43 -0400 |
---|---|---|
committer | JoramWilander <jwawilander@gmail.com> | 2018-07-10 15:01:43 -0400 |
commit | 6c7dc2d29ccac5f9925402f6be1a4c2a3c46c005 (patch) | |
tree | 5564a47257bf6e85aaef711980c5b0fcb4d07dcc /vendor/github.com/hashicorp/yamux/session.go | |
parent | c042ffa460296587579aff54b157a5109e022f7e (diff) | |
parent | 1e1a5e5e85240f25c4faddcb24c5a29a915fe6e4 (diff) | |
download | chat-6c7dc2d29ccac5f9925402f6be1a4c2a3c46c005.tar.gz chat-6c7dc2d29ccac5f9925402f6be1a4c2a3c46c005.tar.bz2 chat-6c7dc2d29ccac5f9925402f6be1a4c2a3c46c005.zip |
Merge branch 'plugins-2'
Diffstat (limited to 'vendor/github.com/hashicorp/yamux/session.go')
-rw-r--r-- | vendor/github.com/hashicorp/yamux/session.go | 646 |
1 files changed, 646 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go new file mode 100644 index 000000000..d8446fa65 --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/session.go @@ -0,0 +1,646 @@ +package yamux + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "log" + "math" + "net" + "strings" + "sync" + "sync/atomic" + "time" +) + +// Session is used to wrap a reliable ordered connection and to +// multiplex it into multiple streams. +type Session struct { + // remoteGoAway indicates the remote side does + // not want futher connections. Must be first for alignment. + remoteGoAway int32 + + // localGoAway indicates that we should stop + // accepting futher connections. Must be first for alignment. + localGoAway int32 + + // nextStreamID is the next stream we should + // send. This depends if we are a client/server. + nextStreamID uint32 + + // config holds our configuration + config *Config + + // logger is used for our logs + logger *log.Logger + + // conn is the underlying connection + conn io.ReadWriteCloser + + // bufRead is a buffered reader + bufRead *bufio.Reader + + // pings is used to track inflight pings + pings map[uint32]chan struct{} + pingID uint32 + pingLock sync.Mutex + + // streams maps a stream id to a stream, and inflight has an entry + // for any outgoing stream that has not yet been established. Both are + // protected by streamLock. + streams map[uint32]*Stream + inflight map[uint32]struct{} + streamLock sync.Mutex + + // synCh acts like a semaphore. It is sized to the AcceptBacklog which + // is assumed to be symmetric between the client and server. This allows + // the client to avoid exceeding the backlog and instead blocks the open. + synCh chan struct{} + + // acceptCh is used to pass ready streams to the client + acceptCh chan *Stream + + // sendCh is used to mark a stream as ready to send, + // or to send a header out directly. + sendCh chan sendReady + + // recvDoneCh is closed when recv() exits to avoid a race + // between stream registration and stream shutdown + recvDoneCh chan struct{} + + // shutdown is used to safely close a session + shutdown bool + shutdownErr error + shutdownCh chan struct{} + shutdownLock sync.Mutex +} + +// sendReady is used to either mark a stream as ready +// or to directly send a header +type sendReady struct { + Hdr []byte + Body io.Reader + Err chan error +} + +// newSession is used to construct a new session +func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { + s := &Session{ + config: config, + logger: log.New(config.LogOutput, "", log.LstdFlags), + conn: conn, + bufRead: bufio.NewReader(conn), + pings: make(map[uint32]chan struct{}), + streams: make(map[uint32]*Stream), + inflight: make(map[uint32]struct{}), + synCh: make(chan struct{}, config.AcceptBacklog), + acceptCh: make(chan *Stream, config.AcceptBacklog), + sendCh: make(chan sendReady, 64), + recvDoneCh: make(chan struct{}), + shutdownCh: make(chan struct{}), + } + if client { + s.nextStreamID = 1 + } else { + s.nextStreamID = 2 + } + go s.recv() + go s.send() + if config.EnableKeepAlive { + go s.keepalive() + } + return s +} + +// IsClosed does a safe check to see if we have shutdown +func (s *Session) IsClosed() bool { + select { + case <-s.shutdownCh: + return true + default: + return false + } +} + +// CloseChan returns a read-only channel which is closed as +// soon as the session is closed. +func (s *Session) CloseChan() <-chan struct{} { + return s.shutdownCh +} + +// NumStreams returns the number of currently open streams +func (s *Session) NumStreams() int { + s.streamLock.Lock() + num := len(s.streams) + s.streamLock.Unlock() + return num +} + +// Open is used to create a new stream as a net.Conn +func (s *Session) Open() (net.Conn, error) { + conn, err := s.OpenStream() + if err != nil { + return nil, err + } + return conn, nil +} + +// OpenStream is used to create a new stream +func (s *Session) OpenStream() (*Stream, error) { + if s.IsClosed() { + return nil, ErrSessionShutdown + } + if atomic.LoadInt32(&s.remoteGoAway) == 1 { + return nil, ErrRemoteGoAway + } + + // Block if we have too many inflight SYNs + select { + case s.synCh <- struct{}{}: + case <-s.shutdownCh: + return nil, ErrSessionShutdown + } + +GET_ID: + // Get an ID, and check for stream exhaustion + id := atomic.LoadUint32(&s.nextStreamID) + if id >= math.MaxUint32-1 { + return nil, ErrStreamsExhausted + } + if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) { + goto GET_ID + } + + // Register the stream + stream := newStream(s, id, streamInit) + s.streamLock.Lock() + s.streams[id] = stream + s.inflight[id] = struct{}{} + s.streamLock.Unlock() + + // Send the window update to create + if err := stream.sendWindowUpdate(); err != nil { + select { + case <-s.synCh: + default: + s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore") + } + return nil, err + } + return stream, nil +} + +// Accept is used to block until the next available stream +// is ready to be accepted. +func (s *Session) Accept() (net.Conn, error) { + conn, err := s.AcceptStream() + if err != nil { + return nil, err + } + return conn, err +} + +// AcceptStream is used to block until the next available stream +// is ready to be accepted. +func (s *Session) AcceptStream() (*Stream, error) { + select { + case stream := <-s.acceptCh: + if err := stream.sendWindowUpdate(); err != nil { + return nil, err + } + return stream, nil + case <-s.shutdownCh: + return nil, s.shutdownErr + } +} + +// Close is used to close the session and all streams. +// Attempts to send a GoAway before closing the connection. +func (s *Session) Close() error { + s.shutdownLock.Lock() + defer s.shutdownLock.Unlock() + + if s.shutdown { + return nil + } + s.shutdown = true + if s.shutdownErr == nil { + s.shutdownErr = ErrSessionShutdown + } + close(s.shutdownCh) + s.conn.Close() + <-s.recvDoneCh + + s.streamLock.Lock() + defer s.streamLock.Unlock() + for _, stream := range s.streams { + stream.forceClose() + } + return nil +} + +// exitErr is used to handle an error that is causing the +// session to terminate. +func (s *Session) exitErr(err error) { + s.shutdownLock.Lock() + if s.shutdownErr == nil { + s.shutdownErr = err + } + s.shutdownLock.Unlock() + s.Close() +} + +// GoAway can be used to prevent accepting further +// connections. It does not close the underlying conn. +func (s *Session) GoAway() error { + return s.waitForSend(s.goAway(goAwayNormal), nil) +} + +// goAway is used to send a goAway message +func (s *Session) goAway(reason uint32) header { + atomic.SwapInt32(&s.localGoAway, 1) + hdr := header(make([]byte, headerSize)) + hdr.encode(typeGoAway, 0, 0, reason) + return hdr +} + +// Ping is used to measure the RTT response time +func (s *Session) Ping() (time.Duration, error) { + // Get a channel for the ping + ch := make(chan struct{}) + + // Get a new ping id, mark as pending + s.pingLock.Lock() + id := s.pingID + s.pingID++ + s.pings[id] = ch + s.pingLock.Unlock() + + // Send the ping request + hdr := header(make([]byte, headerSize)) + hdr.encode(typePing, flagSYN, 0, id) + if err := s.waitForSend(hdr, nil); err != nil { + return 0, err + } + + // Wait for a response + start := time.Now() + select { + case <-ch: + case <-time.After(s.config.ConnectionWriteTimeout): + s.pingLock.Lock() + delete(s.pings, id) // Ignore it if a response comes later. + s.pingLock.Unlock() + return 0, ErrTimeout + case <-s.shutdownCh: + return 0, ErrSessionShutdown + } + + // Compute the RTT + return time.Now().Sub(start), nil +} + +// keepalive is a long running goroutine that periodically does +// a ping to keep the connection alive. +func (s *Session) keepalive() { + for { + select { + case <-time.After(s.config.KeepAliveInterval): + _, err := s.Ping() + if err != nil { + s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) + s.exitErr(ErrKeepAliveTimeout) + return + } + case <-s.shutdownCh: + return + } + } +} + +// waitForSendErr waits to send a header, checking for a potential shutdown +func (s *Session) waitForSend(hdr header, body io.Reader) error { + errCh := make(chan error, 1) + return s.waitForSendErr(hdr, body, errCh) +} + +// waitForSendErr waits to send a header with optional data, checking for a +// potential shutdown. Since there's the expectation that sends can happen +// in a timely manner, we enforce the connection write timeout here. +func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { + t := timerPool.Get() + timer := t.(*time.Timer) + timer.Reset(s.config.ConnectionWriteTimeout) + defer func() { + timer.Stop() + select { + case <-timer.C: + default: + } + timerPool.Put(t) + }() + + ready := sendReady{Hdr: hdr, Body: body, Err: errCh} + select { + case s.sendCh <- ready: + case <-s.shutdownCh: + return ErrSessionShutdown + case <-timer.C: + return ErrConnectionWriteTimeout + } + + select { + case err := <-errCh: + return err + case <-s.shutdownCh: + return ErrSessionShutdown + case <-timer.C: + return ErrConnectionWriteTimeout + } +} + +// sendNoWait does a send without waiting. Since there's the expectation that +// the send happens right here, we enforce the connection write timeout if we +// can't queue the header to be sent. +func (s *Session) sendNoWait(hdr header) error { + t := timerPool.Get() + timer := t.(*time.Timer) + timer.Reset(s.config.ConnectionWriteTimeout) + defer func() { + timer.Stop() + select { + case <-timer.C: + default: + } + timerPool.Put(t) + }() + + select { + case s.sendCh <- sendReady{Hdr: hdr}: + return nil + case <-s.shutdownCh: + return ErrSessionShutdown + case <-timer.C: + return ErrConnectionWriteTimeout + } +} + +// send is a long running goroutine that sends data +func (s *Session) send() { + for { + select { + case ready := <-s.sendCh: + // Send a header if ready + if ready.Hdr != nil { + sent := 0 + for sent < len(ready.Hdr) { + n, err := s.conn.Write(ready.Hdr[sent:]) + if err != nil { + s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) + asyncSendErr(ready.Err, err) + s.exitErr(err) + return + } + sent += n + } + } + + // Send data from a body if given + if ready.Body != nil { + _, err := io.Copy(s.conn, ready.Body) + if err != nil { + s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) + asyncSendErr(ready.Err, err) + s.exitErr(err) + return + } + } + + // No error, successful send + asyncSendErr(ready.Err, nil) + case <-s.shutdownCh: + return + } + } +} + +// recv is a long running goroutine that accepts new data +func (s *Session) recv() { + if err := s.recvLoop(); err != nil { + s.exitErr(err) + } +} + +// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type +var ( + handlers = []func(*Session, header) error{ + typeData: (*Session).handleStreamMessage, + typeWindowUpdate: (*Session).handleStreamMessage, + typePing: (*Session).handlePing, + typeGoAway: (*Session).handleGoAway, + } +) + +// recvLoop continues to receive data until a fatal error is encountered +func (s *Session) recvLoop() error { + defer close(s.recvDoneCh) + hdr := header(make([]byte, headerSize)) + for { + // Read the header + if _, err := io.ReadFull(s.bufRead, hdr); err != nil { + if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") { + s.logger.Printf("[ERR] yamux: Failed to read header: %v", err) + } + return err + } + + // Verify the version + if hdr.Version() != protoVersion { + s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version()) + return ErrInvalidVersion + } + + mt := hdr.MsgType() + if mt < typeData || mt > typeGoAway { + return ErrInvalidMsgType + } + + if err := handlers[mt](s, hdr); err != nil { + return err + } + } +} + +// handleStreamMessage handles either a data or window update frame +func (s *Session) handleStreamMessage(hdr header) error { + // Check for a new stream creation + id := hdr.StreamID() + flags := hdr.Flags() + if flags&flagSYN == flagSYN { + if err := s.incomingStream(id); err != nil { + return err + } + } + + // Get the stream + s.streamLock.Lock() + stream := s.streams[id] + s.streamLock.Unlock() + + // If we do not have a stream, likely we sent a RST + if stream == nil { + // Drain any data on the wire + if hdr.MsgType() == typeData && hdr.Length() > 0 { + s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id) + if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil { + s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err) + return nil + } + } else { + s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr) + } + return nil + } + + // Check if this is a window update + if hdr.MsgType() == typeWindowUpdate { + if err := stream.incrSendWindow(hdr, flags); err != nil { + if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { + s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) + } + return err + } + return nil + } + + // Read the new data + if err := stream.readData(hdr, flags, s.bufRead); err != nil { + if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { + s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) + } + return err + } + return nil +} + +// handlePing is invokde for a typePing frame +func (s *Session) handlePing(hdr header) error { + flags := hdr.Flags() + pingID := hdr.Length() + + // Check if this is a query, respond back in a separate context so we + // don't interfere with the receiving thread blocking for the write. + if flags&flagSYN == flagSYN { + go func() { + hdr := header(make([]byte, headerSize)) + hdr.encode(typePing, flagACK, 0, pingID) + if err := s.sendNoWait(hdr); err != nil { + s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err) + } + }() + return nil + } + + // Handle a response + s.pingLock.Lock() + ch := s.pings[pingID] + if ch != nil { + delete(s.pings, pingID) + close(ch) + } + s.pingLock.Unlock() + return nil +} + +// handleGoAway is invokde for a typeGoAway frame +func (s *Session) handleGoAway(hdr header) error { + code := hdr.Length() + switch code { + case goAwayNormal: + atomic.SwapInt32(&s.remoteGoAway, 1) + case goAwayProtoErr: + s.logger.Printf("[ERR] yamux: received protocol error go away") + return fmt.Errorf("yamux protocol error") + case goAwayInternalErr: + s.logger.Printf("[ERR] yamux: received internal error go away") + return fmt.Errorf("remote yamux internal error") + default: + s.logger.Printf("[ERR] yamux: received unexpected go away") + return fmt.Errorf("unexpected go away received") + } + return nil +} + +// incomingStream is used to create a new incoming stream +func (s *Session) incomingStream(id uint32) error { + // Reject immediately if we are doing a go away + if atomic.LoadInt32(&s.localGoAway) == 1 { + hdr := header(make([]byte, headerSize)) + hdr.encode(typeWindowUpdate, flagRST, id, 0) + return s.sendNoWait(hdr) + } + + // Allocate a new stream + stream := newStream(s, id, streamSYNReceived) + + s.streamLock.Lock() + defer s.streamLock.Unlock() + + // Check if stream already exists + if _, ok := s.streams[id]; ok { + s.logger.Printf("[ERR] yamux: duplicate stream declared") + if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { + s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) + } + return ErrDuplicateStream + } + + // Register the stream + s.streams[id] = stream + + // Check if we've exceeded the backlog + select { + case s.acceptCh <- stream: + return nil + default: + // Backlog exceeded! RST the stream + s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") + delete(s.streams, id) + stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) + return s.sendNoWait(stream.sendHdr) + } +} + +// closeStream is used to close a stream once both sides have +// issued a close. If there was an in-flight SYN and the stream +// was not yet established, then this will give the credit back. +func (s *Session) closeStream(id uint32) { + s.streamLock.Lock() + if _, ok := s.inflight[id]; ok { + select { + case <-s.synCh: + default: + s.logger.Printf("[ERR] yamux: SYN tracking out of sync") + } + } + delete(s.streams, id) + s.streamLock.Unlock() +} + +// establishStream is used to mark a stream that was in the +// SYN Sent state as established. +func (s *Session) establishStream(id uint32) { + s.streamLock.Lock() + if _, ok := s.inflight[id]; ok { + delete(s.inflight, id) + } else { + s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)") + } + select { + case <-s.synCh: + default: + s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)") + } + s.streamLock.Unlock() +} |