package plugin import ( "errors" "fmt" "io" "log" "net" "net/rpc" "sync" "github.com/hashicorp/yamux" ) // RPCServer listens for network connections and then dispenses interface // implementations over net/rpc. // // After setting the fields below, they shouldn't be read again directly // from the structure which may be reading/writing them concurrently. type RPCServer struct { Plugins map[string]Plugin // Stdout, Stderr are what this server will use instead of the // normal stdin/out/err. This is because due to the multi-process nature // of our plugin system, we can't use the normal process values so we // make our own custom one we pipe across. Stdout io.Reader Stderr io.Reader // DoneCh should be set to a non-nil channel that will be closed // when the control requests the RPC server to end. DoneCh chan<- struct{} lock sync.Mutex } // ServerProtocol impl. func (s *RPCServer) Init() error { return nil } // ServerProtocol impl. func (s *RPCServer) Config() string { return "" } // ServerProtocol impl. func (s *RPCServer) Serve(lis net.Listener) { for { conn, err := lis.Accept() if err != nil { log.Printf("[ERR] plugin: plugin server: %s", err) return } go s.ServeConn(conn) } } // ServeConn runs a single connection. // // ServeConn blocks, serving the connection until the client hangs up. func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) { // First create the yamux server to wrap this connection mux, err := yamux.Server(conn, nil) if err != nil { conn.Close() log.Printf("[ERR] plugin: error creating yamux server: %s", err) return } // Accept the control connection control, err := mux.Accept() if err != nil { mux.Close() if err != io.EOF { log.Printf("[ERR] plugin: error accepting control connection: %s", err) } return } // Connect the stdstreams (in, out, err) stdstream := make([]net.Conn, 2) for i, _ := range stdstream { stdstream[i], err = mux.Accept() if err != nil { mux.Close() log.Printf("[ERR] plugin: accepting stream %d: %s", i, err) return } } // Copy std streams out to the proper place go copyStream("stdout", stdstream[0], s.Stdout) go copyStream("stderr", stdstream[1], s.Stderr) // Create the broker and start it up broker := newMuxBroker(mux) go broker.Run() // Use the control connection to build the dispenser and serve the // connection. server := rpc.NewServer() server.RegisterName("Control", &controlServer{ server: s, }) server.RegisterName("Dispenser", &dispenseServer{ broker: broker, plugins: s.Plugins, }) server.ServeConn(control) } // done is called internally by the control server to trigger the // doneCh to close which is listened to by the main process to cleanly // exit. func (s *RPCServer) done() { s.lock.Lock() defer s.lock.Unlock() if s.DoneCh != nil { close(s.DoneCh) s.DoneCh = nil } } // dispenseServer dispenses variousinterface implementations for Terraform. type controlServer struct { server *RPCServer } // Ping can be called to verify the connection (and likely the binary) // is still alive to a plugin. func (c *controlServer) Ping( null bool, response *struct{}) error { *response = struct{}{} return nil } func (c *controlServer) Quit( null bool, response *struct{}) error { // End the server c.server.done() // Always return true *response = struct{}{} return nil } // dispenseServer dispenses variousinterface implementations for Terraform. type dispenseServer struct { broker *MuxBroker plugins map[string]Plugin } func (d *dispenseServer) Dispense( name string, response *uint32) error { // Find the function to create this implementation p, ok := d.plugins[name] if !ok { return fmt.Errorf("unknown plugin type: %s", name) } // Create the implementation first so we know if there is an error. impl, err := p.Server(d.broker) if err != nil { // We turn the error into an errors error so that it works across RPC return errors.New(err.Error()) } // Reserve an ID for our implementation id := d.broker.NextId() *response = id // Run the rest in a goroutine since it can only happen once this RPC // call returns. We wait for a connection for the plugin implementation // and serve it. go func() { conn, err := d.broker.Accept(id) if err != nil { log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err) return } serve(conn, "Plugin", impl) }() return nil } func serve(conn io.ReadWriteCloser, name string, v interface{}) { server := rpc.NewServer() if err := server.RegisterName(name, v); err != nil { log.Printf("[ERR] go-plugin: plugin dispense error: %s", err) return } server.ServeConn(conn) }