From 6215c9159acb85033616d2937edf3d87ef7ca79b Mon Sep 17 00:00:00 2001 From: Chris Date: Mon, 28 Aug 2017 11:27:18 -0500 Subject: add plugin http handler (#7289) --- plugin/rpcplugin/hooks.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) (limited to 'plugin/rpcplugin/hooks.go') diff --git a/plugin/rpcplugin/hooks.go b/plugin/rpcplugin/hooks.go index 5b97742aa..995f4ae1a 100644 --- a/plugin/rpcplugin/hooks.go +++ b/plugin/rpcplugin/hooks.go @@ -1,7 +1,10 @@ package rpcplugin import ( + "bytes" "io" + "io/ioutil" + "net/http" "net/rpc" "reflect" @@ -83,6 +86,33 @@ func (h *LocalHooks) OnDeactivate(args, reply *struct{}) (err error) { return } +type ServeHTTPArgs struct { + ResponseWriterStream int64 + Request *http.Request + RequestBodyStream int64 +} + +func (h *LocalHooks) ServeHTTP(args ServeHTTPArgs, reply *struct{}) error { + w := ConnectHTTPResponseWriter(h.muxer.Connect(args.ResponseWriterStream)) + defer w.Close() + + r := args.Request + if args.RequestBodyStream != 0 { + r.Body = ConnectIOReader(h.muxer.Connect(args.RequestBodyStream)) + } else { + r.Body = ioutil.NopCloser(&bytes.Buffer{}) + } + defer r.Body.Close() + + if hook, ok := h.hooks.(http.Handler); ok { + hook.ServeHTTP(w, r) + } else { + http.NotFound(w, r) + } + + return nil +} + func ServeHooks(hooks interface{}, conn io.ReadWriteCloser, muxer *Muxer) { server := rpc.NewServer() server.Register(&LocalHooks{ @@ -95,6 +125,7 @@ func ServeHooks(hooks interface{}, conn io.ReadWriteCloser, muxer *Muxer) { const ( remoteOnActivate = iota remoteOnDeactivate + remoteServeHTTP maxRemoteHookCount ) @@ -133,6 +164,45 @@ func (h *RemoteHooks) OnDeactivate() error { return h.client.Call("LocalHooks.OnDeactivate", struct{}{}, nil) } +func (h *RemoteHooks) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !h.implemented[remoteServeHTTP] { + http.NotFound(w, r) + return + } + + responseWriterStream, stream := h.muxer.Serve() + defer stream.Close() + go ServeHTTPResponseWriter(w, stream) + + requestBodyStream := int64(0) + if r.Body != nil { + rid, rstream := h.muxer.Serve() + defer rstream.Close() + go ServeIOReader(r.Body, rstream) + requestBodyStream = rid + } + + forwardedRequest := &http.Request{ + Method: r.Method, + URL: r.URL, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Host: r.Host, + RemoteAddr: r.RemoteAddr, + RequestURI: r.RequestURI, + } + + if err := h.client.Call("LocalHooks.ServeHTTP", ServeHTTPArgs{ + ResponseWriterStream: responseWriterStream, + Request: forwardedRequest, + RequestBodyStream: requestBodyStream, + }, nil); err != nil { + http.Error(w, "500 internal server error", http.StatusInternalServerError) + } +} + func (h *RemoteHooks) Close() error { if h.apiCloser != nil { h.apiCloser.Close() @@ -157,6 +227,8 @@ func ConnectHooks(conn io.ReadWriteCloser, muxer *Muxer) (*RemoteHooks, error) { remote.implemented[remoteOnActivate] = true case "OnDeactivate": remote.implemented[remoteOnDeactivate] = true + case "ServeHTTP": + remote.implemented[remoteServeHTTP] = true } } return remote, nil -- cgit v1.2.3-1-g7c22