summaryrefslogtreecommitdiffstats
path: root/vendor/google.golang.org/grpc/transport/controlbuf.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/controlbuf.go')
-rw-r--r--vendor/google.golang.org/grpc/transport/controlbuf.go101
1 files changed, 64 insertions, 37 deletions
diff --git a/vendor/google.golang.org/grpc/transport/controlbuf.go b/vendor/google.golang.org/grpc/transport/controlbuf.go
index e147cd51b..5c5891a11 100644
--- a/vendor/google.golang.org/grpc/transport/controlbuf.go
+++ b/vendor/google.golang.org/grpc/transport/controlbuf.go
@@ -28,6 +28,10 @@ import (
"golang.org/x/net/http2/hpack"
)
+var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
+ e.SetMaxDynamicTableSizeLimit(v)
+}
+
type itemNode struct {
it interface{}
next *itemNode
@@ -80,6 +84,13 @@ func (il *itemList) isEmpty() bool {
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
+// registerStream is used to register an incoming stream with loopy writer.
+type registerStream struct {
+ streamID uint32
+ wq *writeQuota
+}
+
+// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
@@ -361,44 +372,47 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
const minBatchSize = 1000
// run should be run in a separate goroutine.
-func (l *loopyWriter) run() {
- var (
- it interface{}
- err error
- isEmpty bool
- )
+func (l *loopyWriter) run() (err error) {
defer func() {
- errorf("transport: loopyWriter.run returning. Err: %v", err)
+ if err == ErrConnClosing {
+ // Don't log ErrConnClosing as error since it happens
+ // 1. When the connection is closed by some other known issue.
+ // 2. User closed the connection.
+ // 3. A graceful close of connection.
+ infof("transport: loopyWriter.run returning. %v", err)
+ err = nil
+ }
}()
for {
- it, err = l.cbuf.get(true)
+ it, err := l.cbuf.get(true)
if err != nil {
- return
+ return err
}
if err = l.handle(it); err != nil {
- return
+ return err
}
if _, err = l.processData(); err != nil {
- return
+ return err
}
gosched := true
hasdata:
for {
- it, err = l.cbuf.get(false)
+ it, err := l.cbuf.get(false)
if err != nil {
- return
+ return err
}
if it != nil {
if err = l.handle(it); err != nil {
- return
+ return err
}
if _, err = l.processData(); err != nil {
- return
+ return err
}
continue hasdata
}
- if isEmpty, err = l.processData(); err != nil {
- return
+ isEmpty, err := l.processData()
+ if err != nil {
+ return err
}
if !isEmpty {
continue hasdata
@@ -450,30 +464,39 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
return l.framer.fr.WriteSettingsAck()
}
+func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
+ str := &outStream{
+ id: h.streamID,
+ state: empty,
+ itl: &itemList{},
+ wq: h.wq,
+ }
+ l.estdStreams[h.streamID] = str
+ return nil
+}
+
func (l *loopyWriter) headerHandler(h *headerFrame) error {
if l.side == serverSide {
- if h.endStream { // Case 1.A: Server wants to close stream.
- // Make sure it's not a trailers only response.
- if str, ok := l.estdStreams[h.streamID]; ok {
- if str.state != empty { // either active or waiting on stream quota.
- // add it str's list of items.
- str.itl.enqueue(h)
- return nil
- }
- }
- if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
- return err
- }
- return l.cleanupStreamHandler(h.cleanup)
+ str, ok := l.estdStreams[h.streamID]
+ if !ok {
+ warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
+ return nil
+ }
+ // Case 1.A: Server is responding back with headers.
+ if !h.endStream {
+ return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
}
- // Case 1.B: Server is responding back with headers.
- str := &outStream{
- state: empty,
- itl: &itemList{},
- wq: h.wq,
+ // else: Case 1.B: Server wants to close stream.
+
+ if str.state != empty { // either active or waiting on stream quota.
+ // add it str's list of items.
+ str.itl.enqueue(h)
+ return nil
+ }
+ if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
+ return err
}
- l.estdStreams[h.streamID] = str
- return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
+ return l.cleanupStreamHandler(h.cleanup)
}
// Case 2: Client wants to originate stream.
str := &outStream{
@@ -632,6 +655,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.outgoingSettingsHandler(i)
case *headerFrame:
return l.headerHandler(i)
+ case *registerStream:
+ return l.registerStreamHandler(i)
case *cleanupStream:
return l.cleanupStreamHandler(i)
case *incomingGoAway:
@@ -664,6 +689,8 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
}
}
}
+ case http2.SettingHeaderTableSize:
+ updateHeaderTblSize(l.hEnc, s.Val)
}
}
return nil