summaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/github.com/lib/pq/notify.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/github.com/lib/pq/notify.go')
-rw-r--r--Godeps/_workspace/src/github.com/lib/pq/notify.go766
1 files changed, 0 insertions, 766 deletions
diff --git a/Godeps/_workspace/src/github.com/lib/pq/notify.go b/Godeps/_workspace/src/github.com/lib/pq/notify.go
deleted file mode 100644
index 8cad57815..000000000
--- a/Godeps/_workspace/src/github.com/lib/pq/notify.go
+++ /dev/null
@@ -1,766 +0,0 @@
-package pq
-
-// Package pq is a pure Go Postgres driver for the database/sql package.
-// This module contains support for Postgres LISTEN/NOTIFY.
-
-import (
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-)
-
-// Notification represents a single notification from the database.
-type Notification struct {
- // Process ID (PID) of the notifying postgres backend.
- BePid int
- // Name of the channel the notification was sent on.
- Channel string
- // Payload, or the empty string if unspecified.
- Extra string
-}
-
-func recvNotification(r *readBuf) *Notification {
- bePid := r.int32()
- channel := r.string()
- extra := r.string()
-
- return &Notification{bePid, channel, extra}
-}
-
-const (
- connStateIdle int32 = iota
- connStateExpectResponse
- connStateExpectReadyForQuery
-)
-
-type message struct {
- typ byte
- err error
-}
-
-var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
-
-// ListenerConn is a low-level interface for waiting for notifications. You
-// should use Listener instead.
-type ListenerConn struct {
- // guards cn and err
- connectionLock sync.Mutex
- cn *conn
- err error
-
- connState int32
-
- // the sending goroutine will be holding this lock
- senderLock sync.Mutex
-
- notificationChan chan<- *Notification
-
- replyChan chan message
-}
-
-// Creates a new ListenerConn. Use NewListener instead.
-func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
- cn, err := Open(name)
- if err != nil {
- return nil, err
- }
-
- l := &ListenerConn{
- cn: cn.(*conn),
- notificationChan: notificationChan,
- connState: connStateIdle,
- replyChan: make(chan message, 2),
- }
-
- go l.listenerConnMain()
-
- return l, nil
-}
-
-// We can only allow one goroutine at a time to be running a query on the
-// connection for various reasons, so the goroutine sending on the connection
-// must be holding senderLock.
-//
-// Returns an error if an unrecoverable error has occurred and the ListenerConn
-// should be abandoned.
-func (l *ListenerConn) acquireSenderLock() error {
- // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
- l.senderLock.Lock()
-
- l.connectionLock.Lock()
- err := l.err
- l.connectionLock.Unlock()
- if err != nil {
- l.senderLock.Unlock()
- return err
- }
- return nil
-}
-
-func (l *ListenerConn) releaseSenderLock() {
- l.senderLock.Unlock()
-}
-
-// setState advances the protocol state to newState. Returns false if moving
-// to that state from the current state is not allowed.
-func (l *ListenerConn) setState(newState int32) bool {
- var expectedState int32
-
- switch newState {
- case connStateIdle:
- expectedState = connStateExpectReadyForQuery
- case connStateExpectResponse:
- expectedState = connStateIdle
- case connStateExpectReadyForQuery:
- expectedState = connStateExpectResponse
- default:
- panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
- }
-
- return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
-}
-
-// Main logic is here: receive messages from the postgres backend, forward
-// notifications and query replies and keep the internal state in sync with the
-// protocol state. Returns when the connection has been lost, is about to go
-// away or should be discarded because we couldn't agree on the state with the
-// server backend.
-func (l *ListenerConn) listenerConnLoop() (err error) {
- defer errRecoverNoErrBadConn(&err)
-
- r := &readBuf{}
- for {
- t, err := l.cn.recvMessage(r)
- if err != nil {
- return err
- }
-
- switch t {
- case 'A':
- // recvNotification copies all the data so we don't need to worry
- // about the scratch buffer being overwritten.
- l.notificationChan <- recvNotification(r)
-
- case 'T', 'D':
- // only used by tests; ignore
-
- case 'E':
- // We might receive an ErrorResponse even when not in a query; it
- // is expected that the server will close the connection after
- // that, but we should make sure that the error we display is the
- // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
- if !l.setState(connStateExpectReadyForQuery) {
- return parseError(r)
- }
- l.replyChan <- message{t, parseError(r)}
-
- case 'C', 'I':
- if !l.setState(connStateExpectReadyForQuery) {
- // protocol out of sync
- return fmt.Errorf("unexpected CommandComplete")
- }
- // ExecSimpleQuery doesn't need to know about this message
-
- case 'Z':
- if !l.setState(connStateIdle) {
- // protocol out of sync
- return fmt.Errorf("unexpected ReadyForQuery")
- }
- l.replyChan <- message{t, nil}
-
- case 'N', 'S':
- // ignore
- default:
- return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
- }
- }
-}
-
-// This is the main routine for the goroutine receiving on the database
-// connection. Most of the main logic is in listenerConnLoop.
-func (l *ListenerConn) listenerConnMain() {
- err := l.listenerConnLoop()
-
- // listenerConnLoop terminated; we're done, but we still have to clean up.
- // Make sure nobody tries to start any new queries by making sure the err
- // pointer is set. It is important that we do not overwrite its value; a
- // connection could be closed by either this goroutine or one sending on
- // the connection -- whoever closes the connection is assumed to have the
- // more meaningful error message (as the other one will probably get
- // net.errClosed), so that goroutine sets the error we expose while the
- // other error is discarded. If the connection is lost while two
- // goroutines are operating on the socket, it probably doesn't matter which
- // error we expose so we don't try to do anything more complex.
- l.connectionLock.Lock()
- if l.err == nil {
- l.err = err
- }
- l.cn.Close()
- l.connectionLock.Unlock()
-
- // There might be a query in-flight; make sure nobody's waiting for a
- // response to it, since there's not going to be one.
- close(l.replyChan)
-
- // let the listener know we're done
- close(l.notificationChan)
-
- // this ListenerConn is done
-}
-
-// Send a LISTEN query to the server. See ExecSimpleQuery.
-func (l *ListenerConn) Listen(channel string) (bool, error) {
- return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
-}
-
-// Send an UNLISTEN query to the server. See ExecSimpleQuery.
-func (l *ListenerConn) Unlisten(channel string) (bool, error) {
- return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
-}
-
-// Send `UNLISTEN *` to the server. See ExecSimpleQuery.
-func (l *ListenerConn) UnlistenAll() (bool, error) {
- return l.ExecSimpleQuery("UNLISTEN *")
-}
-
-// Ping the remote server to make sure it's alive. Non-nil error means the
-// connection has failed and should be abandoned.
-func (l *ListenerConn) Ping() error {
- sent, err := l.ExecSimpleQuery("")
- if !sent {
- return err
- }
- if err != nil {
- // shouldn't happen
- panic(err)
- }
- return nil
-}
-
-// Attempt to send a query on the connection. Returns an error if sending the
-// query failed, and the caller should initiate closure of this connection.
-// The caller must be holding senderLock (see acquireSenderLock and
-// releaseSenderLock).
-func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
- defer errRecoverNoErrBadConn(&err)
-
- // must set connection state before sending the query
- if !l.setState(connStateExpectResponse) {
- panic("two queries running at the same time")
- }
-
- // Can't use l.cn.writeBuf here because it uses the scratch buffer which
- // might get overwritten by listenerConnLoop.
- b := &writeBuf{
- buf: []byte("Q\x00\x00\x00\x00"),
- pos: 1,
- }
- b.string(q)
- l.cn.send(b)
-
- return nil
-}
-
-// Execute a "simple query" (i.e. one with no bindable parameters) on the
-// connection. The possible return values are:
-// 1) "executed" is true; the query was executed to completion on the
-// database server. If the query failed, err will be set to the error
-// returned by the database, otherwise err will be nil.
-// 2) If "executed" is false, the query could not be executed on the remote
-// server. err will be non-nil.
-//
-// After a call to ExecSimpleQuery has returned an executed=false value, the
-// connection has either been closed or will be closed shortly thereafter, and
-// all subsequently executed queries will return an error.
-func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
- if err = l.acquireSenderLock(); err != nil {
- return false, err
- }
- defer l.releaseSenderLock()
-
- err = l.sendSimpleQuery(q)
- if err != nil {
- // We can't know what state the protocol is in, so we need to abandon
- // this connection.
- l.connectionLock.Lock()
- // Set the error pointer if it hasn't been set already; see
- // listenerConnMain.
- if l.err == nil {
- l.err = err
- }
- l.connectionLock.Unlock()
- l.cn.c.Close()
- return false, err
- }
-
- // now we just wait for a reply..
- for {
- m, ok := <-l.replyChan
- if !ok {
- // We lost the connection to server, don't bother waiting for a
- // a response. err should have been set already.
- l.connectionLock.Lock()
- err := l.err
- l.connectionLock.Unlock()
- return false, err
- }
- switch m.typ {
- case 'Z':
- // sanity check
- if m.err != nil {
- panic("m.err != nil")
- }
- // done; err might or might not be set
- return true, err
-
- case 'E':
- // sanity check
- if m.err == nil {
- panic("m.err == nil")
- }
- // server responded with an error; ReadyForQuery to follow
- err = m.err
-
- default:
- return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
- }
- }
-}
-
-func (l *ListenerConn) Close() error {
- l.connectionLock.Lock()
- if l.err != nil {
- l.connectionLock.Unlock()
- return errListenerConnClosed
- }
- l.err = errListenerConnClosed
- l.connectionLock.Unlock()
- // We can't send anything on the connection without holding senderLock.
- // Simply close the net.Conn to wake up everyone operating on it.
- return l.cn.c.Close()
-}
-
-// Err() returns the reason the connection was closed. It is not safe to call
-// this function until l.Notify has been closed.
-func (l *ListenerConn) Err() error {
- return l.err
-}
-
-var errListenerClosed = errors.New("pq: Listener has been closed")
-
-var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
-var ErrChannelNotOpen = errors.New("pq: channel is not open")
-
-type ListenerEventType int
-
-const (
- // Emitted only when the database connection has been initially
- // initialized. err will always be nil.
- ListenerEventConnected ListenerEventType = iota
-
- // Emitted after a database connection has been lost, either because of an
- // error or because Close has been called. err will be set to the reason
- // the database connection was lost.
- ListenerEventDisconnected
-
- // Emitted after a database connection has been re-established after
- // connection loss. err will always be nil. After this event has been
- // emitted, a nil pq.Notification is sent on the Listener.Notify channel.
- ListenerEventReconnected
-
- // Emitted after a connection to the database was attempted, but failed.
- // err will be set to an error describing why the connection attempt did
- // not succeed.
- ListenerEventConnectionAttemptFailed
-)
-
-type EventCallbackType func(event ListenerEventType, err error)
-
-// Listener provides an interface for listening to notifications from a
-// PostgreSQL database. For general usage information, see section
-// "Notifications".
-//
-// Listener can safely be used from concurrently running goroutines.
-type Listener struct {
- // Channel for receiving notifications from the database. In some cases a
- // nil value will be sent. See section "Notifications" above.
- Notify chan *Notification
-
- name string
- minReconnectInterval time.Duration
- maxReconnectInterval time.Duration
- eventCallback EventCallbackType
-
- lock sync.Mutex
- isClosed bool
- reconnectCond *sync.Cond
- cn *ListenerConn
- connNotificationChan <-chan *Notification
- channels map[string]struct{}
-}
-
-// NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
-//
-// name should be set to a connection string to be used to establish the
-// database connection (see section "Connection String Parameters" above).
-//
-// minReconnectInterval controls the duration to wait before trying to
-// re-establish the database connection after connection loss. After each
-// consecutive failure this interval is doubled, until maxReconnectInterval is
-// reached. Successfully completing the connection establishment procedure
-// resets the interval back to minReconnectInterval.
-//
-// The last parameter eventCallback can be set to a function which will be
-// called by the Listener when the state of the underlying database connection
-// changes. This callback will be called by the goroutine which dispatches the
-// notifications over the Notify channel, so you should try to avoid doing
-// potentially time-consuming operations from the callback.
-func NewListener(name string,
- minReconnectInterval time.Duration,
- maxReconnectInterval time.Duration,
- eventCallback EventCallbackType) *Listener {
- l := &Listener{
- name: name,
- minReconnectInterval: minReconnectInterval,
- maxReconnectInterval: maxReconnectInterval,
- eventCallback: eventCallback,
-
- channels: make(map[string]struct{}),
-
- Notify: make(chan *Notification, 32),
- }
- l.reconnectCond = sync.NewCond(&l.lock)
-
- go l.listenerMain()
-
- return l
-}
-
-// Returns the notification channel for this listener. This is the same
-// channel as Notify, and will not be recreated during the life time of the
-// Listener.
-func (l *Listener) NotificationChannel() <-chan *Notification {
- return l.Notify
-}
-
-// Listen starts listening for notifications on a channel. Calls to this
-// function will block until an acknowledgement has been received from the
-// server. Note that Listener automatically re-establishes the connection
-// after connection loss, so this function may block indefinitely if the
-// connection can not be re-established.
-//
-// Listen will only fail in three conditions:
-// 1) The channel is already open. The returned error will be
-// ErrChannelAlreadyOpen.
-// 2) The query was executed on the remote server, but PostgreSQL returned an
-// error message in response to the query. The returned error will be a
-// pq.Error containing the information the server supplied.
-// 3) Close is called on the Listener before the request could be completed.
-//
-// The channel name is case-sensitive.
-func (l *Listener) Listen(channel string) error {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- if l.isClosed {
- return errListenerClosed
- }
-
- // The server allows you to issue a LISTEN on a channel which is already
- // open, but it seems useful to be able to detect this case to spot for
- // mistakes in application logic. If the application genuinely does't
- // care, it can check the exported error and ignore it.
- _, exists := l.channels[channel]
- if exists {
- return ErrChannelAlreadyOpen
- }
-
- if l.cn != nil {
- // If gotResponse is true but error is set, the query was executed on
- // the remote server, but resulted in an error. This should be
- // relatively rare, so it's fine if we just pass the error to our
- // caller. However, if gotResponse is false, we could not complete the
- // query on the remote server and our underlying connection is about
- // to go away, so we only add relname to l.channels, and wait for
- // resync() to take care of the rest.
- gotResponse, err := l.cn.Listen(channel)
- if gotResponse && err != nil {
- return err
- }
- }
-
- l.channels[channel] = struct{}{}
- for l.cn == nil {
- l.reconnectCond.Wait()
- // we let go of the mutex for a while
- if l.isClosed {
- return errListenerClosed
- }
- }
-
- return nil
-}
-
-// Unlisten removes a channel from the Listener's channel list. Returns
-// ErrChannelNotOpen if the Listener is not listening on the specified channel.
-// Returns immediately with no error if there is no connection. Note that you
-// might still get notifications for this channel even after Unlisten has
-// returned.
-//
-// The channel name is case-sensitive.
-func (l *Listener) Unlisten(channel string) error {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- if l.isClosed {
- return errListenerClosed
- }
-
- // Similarly to LISTEN, this is not an error in Postgres, but it seems
- // useful to distinguish from the normal conditions.
- _, exists := l.channels[channel]
- if !exists {
- return ErrChannelNotOpen
- }
-
- if l.cn != nil {
- // Similarly to Listen (see comment in that function), the caller
- // should only be bothered with an error if it came from the backend as
- // a response to our query.
- gotResponse, err := l.cn.Unlisten(channel)
- if gotResponse && err != nil {
- return err
- }
- }
-
- // Don't bother waiting for resync if there's no connection.
- delete(l.channels, channel)
- return nil
-}
-
-// UnlistenAll removes all channels from the Listener's channel list. Returns
-// immediately with no error if there is no connection. Note that you might
-// still get notifications for any of the deleted channels even after
-// UnlistenAll has returned.
-func (l *Listener) UnlistenAll() error {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- if l.isClosed {
- return errListenerClosed
- }
-
- if l.cn != nil {
- // Similarly to Listen (see comment in that function), the caller
- // should only be bothered with an error if it came from the backend as
- // a response to our query.
- gotResponse, err := l.cn.UnlistenAll()
- if gotResponse && err != nil {
- return err
- }
- }
-
- // Don't bother waiting for resync if there's no connection.
- l.channels = make(map[string]struct{})
- return nil
-}
-
-// Ping the remote server to make sure it's alive. Non-nil return value means
-// that there is no active connection.
-func (l *Listener) Ping() error {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- if l.isClosed {
- return errListenerClosed
- }
- if l.cn == nil {
- return errors.New("no connection")
- }
-
- return l.cn.Ping()
-}
-
-// Clean up after losing the server connection. Returns l.cn.Err(), which
-// should have the reason the connection was lost.
-func (l *Listener) disconnectCleanup() error {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- // sanity check; can't look at Err() until the channel has been closed
- select {
- case _, ok := <-l.connNotificationChan:
- if ok {
- panic("connNotificationChan not closed")
- }
- default:
- panic("connNotificationChan not closed")
- }
-
- err := l.cn.Err()
- l.cn.Close()
- l.cn = nil
- return err
-}
-
-// Synchronize the list of channels we want to be listening on with the server
-// after the connection has been established.
-func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
- doneChan := make(chan error)
- go func() {
- for channel := range l.channels {
- // If we got a response, return that error to our caller as it's
- // going to be more descriptive than cn.Err().
- gotResponse, err := cn.Listen(channel)
- if gotResponse && err != nil {
- doneChan <- err
- return
- }
-
- // If we couldn't reach the server, wait for notificationChan to
- // close and then return the error message from the connection, as
- // per ListenerConn's interface.
- if err != nil {
- for _ = range notificationChan {
- }
- doneChan <- cn.Err()
- return
- }
- }
- doneChan <- nil
- }()
-
- // Ignore notifications while synchronization is going on to avoid
- // deadlocks. We have to send a nil notification over Notify anyway as
- // we can't possibly know which notifications (if any) were lost while
- // the connection was down, so there's no reason to try and process
- // these messages at all.
- for {
- select {
- case _, ok := <-notificationChan:
- if !ok {
- notificationChan = nil
- }
-
- case err := <-doneChan:
- return err
- }
- }
-}
-
-// caller should NOT be holding l.lock
-func (l *Listener) closed() bool {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- return l.isClosed
-}
-
-func (l *Listener) connect() error {
- notificationChan := make(chan *Notification, 32)
- cn, err := NewListenerConn(l.name, notificationChan)
- if err != nil {
- return err
- }
-
- l.lock.Lock()
- defer l.lock.Unlock()
-
- err = l.resync(cn, notificationChan)
- if err != nil {
- cn.Close()
- return err
- }
-
- l.cn = cn
- l.connNotificationChan = notificationChan
- l.reconnectCond.Broadcast()
-
- return nil
-}
-
-// Close disconnects the Listener from the database and shuts it down.
-// Subsequent calls to its methods will return an error. Close returns an
-// error if the connection has already been closed.
-func (l *Listener) Close() error {
- l.lock.Lock()
- defer l.lock.Unlock()
-
- if l.isClosed {
- return errListenerClosed
- }
-
- if l.cn != nil {
- l.cn.Close()
- }
- l.isClosed = true
-
- return nil
-}
-
-func (l *Listener) emitEvent(event ListenerEventType, err error) {
- if l.eventCallback != nil {
- l.eventCallback(event, err)
- }
-}
-
-// Main logic here: maintain a connection to the server when possible, wait
-// for notifications and emit events.
-func (l *Listener) listenerConnLoop() {
- var nextReconnect time.Time
-
- reconnectInterval := l.minReconnectInterval
- for {
- for {
- err := l.connect()
- if err == nil {
- break
- }
-
- if l.closed() {
- return
- }
- l.emitEvent(ListenerEventConnectionAttemptFailed, err)
-
- time.Sleep(reconnectInterval)
- reconnectInterval *= 2
- if reconnectInterval > l.maxReconnectInterval {
- reconnectInterval = l.maxReconnectInterval
- }
- }
-
- if nextReconnect.IsZero() {
- l.emitEvent(ListenerEventConnected, nil)
- } else {
- l.emitEvent(ListenerEventReconnected, nil)
- l.Notify <- nil
- }
-
- reconnectInterval = l.minReconnectInterval
- nextReconnect = time.Now().Add(reconnectInterval)
-
- for {
- notification, ok := <-l.connNotificationChan
- if !ok {
- // lost connection, loop again
- break
- }
- l.Notify <- notification
- }
-
- err := l.disconnectCleanup()
- if l.closed() {
- return
- }
- l.emitEvent(ListenerEventDisconnected, err)
-
- time.Sleep(nextReconnect.Sub(time.Now()))
- }
-}
-
-func (l *Listener) listenerMain() {
- l.listenerConnLoop()
- close(l.Notify)
-}