summaryrefslogtreecommitdiffstats
path: root/vendor/golang.org/x/net/http2/transport.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/golang.org/x/net/http2/transport.go')
-rw-r--r--vendor/golang.org/x/net/http2/transport.go147
1 files changed, 110 insertions, 37 deletions
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go
index 42c73bd1e..8f5f84412 100644
--- a/vendor/golang.org/x/net/http2/transport.go
+++ b/vendor/golang.org/x/net/http2/transport.go
@@ -10,6 +10,7 @@ import (
"bufio"
"bytes"
"compress/gzip"
+ "crypto/rand"
"crypto/tls"
"errors"
"fmt"
@@ -150,6 +151,9 @@ type ClientConn struct {
readerDone chan struct{} // closed on error
readerErr error // set before readerDone is closed
+ idleTimeout time.Duration // or 0 for never
+ idleTimer *time.Timer
+
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow flow // our conn-level flow control quota (cs.flow is per stream)
@@ -160,6 +164,7 @@ type ClientConn struct {
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*clientStream // client-initiated
nextStreamID uint32
+ pings map[[8]byte]chan struct{} // in flight ping data to notification channel
bw *bufio.Writer
br *bufio.Reader
fr *Framer
@@ -194,6 +199,7 @@ type clientStream struct {
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
+ didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu
peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed
@@ -221,15 +227,26 @@ func (cs *clientStream) awaitRequestCancel(req *http.Request) {
}
select {
case <-req.Cancel:
+ cs.cancelStream()
cs.bufPipe.CloseWithError(errRequestCanceled)
- cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
case <-ctx.Done():
+ cs.cancelStream()
cs.bufPipe.CloseWithError(ctx.Err())
- cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
case <-cs.done:
}
}
+func (cs *clientStream) cancelStream() {
+ cs.cc.mu.Lock()
+ didReset := cs.didReset
+ cs.didReset = true
+ cs.cc.mu.Unlock()
+
+ if !didReset {
+ cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+ }
+}
+
// checkResetOrDone reports any error sent in a RST_STREAM frame by the
// server, or errStreamClosed if the stream is complete.
func (cs *clientStream) checkResetOrDone() error {
@@ -431,6 +448,11 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
wantSettingsAck: true,
+ pings: make(map[[8]byte]chan struct{}),
+ }
+ if d := t.idleConnTimeout(); d != 0 {
+ cc.idleTimeout = d
+ cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
}
if VerboseLogs {
t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
@@ -508,6 +530,16 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
cc.nextStreamID < math.MaxInt32
}
+// onIdleTimeout is called from a time.AfterFunc goroutine. It will
+// only be called when we're idle, but because we're coming from a new
+// goroutine, there could be a new request coming in at the same time,
+// so this simply calls the synchronized closeIfIdle to shut down this
+// connection. The timer could just call closeIfIdle, but this is more
+// clear.
+func (cc *ClientConn) onIdleTimeout() {
+ cc.closeIfIdle()
+}
+
func (cc *ClientConn) closeIfIdle() {
cc.mu.Lock()
if len(cc.streams) > 0 {
@@ -604,51 +636,37 @@ func (cc *ClientConn) responseHeaderTimeout() time.Duration {
// Certain headers are special-cased as okay but not transmitted later.
func checkConnHeaders(req *http.Request) error {
if v := req.Header.Get("Upgrade"); v != "" {
- return errors.New("http2: invalid Upgrade request header")
+ return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
}
- if v := req.Header.Get("Transfer-Encoding"); (v != "" && v != "chunked") || len(req.Header["Transfer-Encoding"]) > 1 {
- return errors.New("http2: invalid Transfer-Encoding request header")
+ if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
+ return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
}
- if v := req.Header.Get("Connection"); (v != "" && v != "close" && v != "keep-alive") || len(req.Header["Connection"]) > 1 {
- return errors.New("http2: invalid Connection request header")
+ if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "close" && vv[0] != "keep-alive") {
+ return fmt.Errorf("http2: invalid Connection request header: %q", vv)
}
return nil
}
-func bodyAndLength(req *http.Request) (body io.Reader, contentLen int64) {
- body = req.Body
- if body == nil {
- return nil, 0
+// actualContentLength returns a sanitized version of
+// req.ContentLength, where 0 actually means zero (not unknown) and -1
+// means unknown.
+func actualContentLength(req *http.Request) int64 {
+ if req.Body == nil {
+ return 0
}
if req.ContentLength != 0 {
- return req.Body, req.ContentLength
- }
-
- // We have a body but a zero content length. Test to see if
- // it's actually zero or just unset.
- var buf [1]byte
- n, rerr := body.Read(buf[:])
- if rerr != nil && rerr != io.EOF {
- return errorReader{rerr}, -1
+ return req.ContentLength
}
- if n == 1 {
- // Oh, guess there is data in this Body Reader after all.
- // The ContentLength field just wasn't set.
- // Stitch the Body back together again, re-attaching our
- // consumed byte.
- if rerr == io.EOF {
- return bytes.NewReader(buf[:]), 1
- }
- return io.MultiReader(bytes.NewReader(buf[:]), body), -1
- }
- // Body is actually zero bytes.
- return nil, 0
+ return -1
}
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
if err := checkConnHeaders(req); err != nil {
return nil, err
}
+ if cc.idleTimer != nil {
+ cc.idleTimer.Stop()
+ }
trailers, err := commaSeparatedTrailers(req)
if err != nil {
@@ -663,8 +681,9 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, errClientConnUnusable
}
- body, contentLen := bodyAndLength(req)
+ body := req.Body
hasBody := body != nil
+ contentLen := actualContentLength(req)
// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
var requestedGzip bool
@@ -1046,7 +1065,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
cc.writeHeader(":method", req.Method)
if req.Method != "CONNECT" {
cc.writeHeader(":path", path)
- cc.writeHeader(":scheme", "https")
+ cc.writeHeader(":scheme", req.URL.Scheme)
}
if trailers != "" {
cc.writeHeader("trailer", trailers)
@@ -1173,6 +1192,9 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
if andRemove && cs != nil && !cc.closed {
cc.lastActive = time.Now()
delete(cc.streams, id)
+ if len(cc.streams) == 0 && cc.idleTimer != nil {
+ cc.idleTimer.Reset(cc.idleTimeout)
+ }
close(cs.done)
cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
}
@@ -1229,6 +1251,10 @@ func (rl *clientConnReadLoop) cleanup() {
defer cc.t.connPool().MarkDead(cc)
defer close(cc.readerDone)
+ if cc.idleTimer != nil {
+ cc.idleTimer.Stop()
+ }
+
// Close any response bodies if the server closes prematurely.
// TODO: also do this if we've written the headers but not
// gotten a response yet.
@@ -1652,9 +1678,10 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
cc.bw.Flush()
cc.wmu.Unlock()
}
+ didReset := cs.didReset
cc.mu.Unlock()
- if len(data) > 0 {
+ if len(data) > 0 && !didReset {
if _, err := cs.bufPipe.Write(data); err != nil {
rl.endStreamError(cs, err)
return err
@@ -1815,10 +1842,56 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
return nil
}
+// Ping sends a PING frame to the server and waits for the ack.
+// Public implementation is in go17.go and not_go17.go
+func (cc *ClientConn) ping(ctx contextContext) error {
+ c := make(chan struct{})
+ // Generate a random payload
+ var p [8]byte
+ for {
+ if _, err := rand.Read(p[:]); err != nil {
+ return err
+ }
+ cc.mu.Lock()
+ // check for dup before insert
+ if _, found := cc.pings[p]; !found {
+ cc.pings[p] = c
+ cc.mu.Unlock()
+ break
+ }
+ cc.mu.Unlock()
+ }
+ cc.wmu.Lock()
+ if err := cc.fr.WritePing(false, p); err != nil {
+ cc.wmu.Unlock()
+ return err
+ }
+ if err := cc.bw.Flush(); err != nil {
+ cc.wmu.Unlock()
+ return err
+ }
+ cc.wmu.Unlock()
+ select {
+ case <-c:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-cc.readerDone:
+ // connection closed
+ return cc.readerErr
+ }
+}
+
func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
if f.IsAck() {
- // 6.7 PING: " An endpoint MUST NOT respond to PING frames
- // containing this flag."
+ cc := rl.cc
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ // If ack, notify listener if any
+ if c, ok := cc.pings[f.Data]; ok {
+ close(c)
+ delete(cc.pings, f.Data)
+ }
return nil
}
cc := rl.cc