summaryrefslogtreecommitdiffstats
path: root/Godeps/_workspace/src/gopkg.in/redis.v2/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'Godeps/_workspace/src/gopkg.in/redis.v2/pubsub.go')
-rw-r--r--Godeps/_workspace/src/gopkg.in/redis.v2/pubsub.go134
1 files changed, 134 insertions, 0 deletions
diff --git a/Godeps/_workspace/src/gopkg.in/redis.v2/pubsub.go b/Godeps/_workspace/src/gopkg.in/redis.v2/pubsub.go
new file mode 100644
index 000000000..6ac130bac
--- /dev/null
+++ b/Godeps/_workspace/src/gopkg.in/redis.v2/pubsub.go
@@ -0,0 +1,134 @@
+package redis
+
+import (
+ "fmt"
+ "time"
+)
+
+// Not thread-safe.
+type PubSub struct {
+ *baseClient
+}
+
+func (c *Client) PubSub() *PubSub {
+ return &PubSub{
+ baseClient: &baseClient{
+ opt: c.opt,
+ connPool: newSingleConnPool(c.connPool, false),
+ },
+ }
+}
+
+func (c *Client) Publish(channel, message string) *IntCmd {
+ req := NewIntCmd("PUBLISH", channel, message)
+ c.Process(req)
+ return req
+}
+
+type Message struct {
+ Channel string
+ Payload string
+}
+
+func (m *Message) String() string {
+ return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
+}
+
+type PMessage struct {
+ Channel string
+ Pattern string
+ Payload string
+}
+
+func (m *PMessage) String() string {
+ return fmt.Sprintf("PMessage<%s: %s>", m.Channel, m.Payload)
+}
+
+type Subscription struct {
+ Kind string
+ Channel string
+ Count int
+}
+
+func (m *Subscription) String() string {
+ return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
+}
+
+func (c *PubSub) Receive() (interface{}, error) {
+ return c.ReceiveTimeout(0)
+}
+
+func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
+ cn, err := c.conn()
+ if err != nil {
+ return nil, err
+ }
+ cn.readTimeout = timeout
+
+ cmd := NewSliceCmd()
+ if err := cmd.parseReply(cn.rd); err != nil {
+ return nil, err
+ }
+
+ reply := cmd.Val()
+
+ msgName := reply[0].(string)
+ switch msgName {
+ case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
+ return &Subscription{
+ Kind: msgName,
+ Channel: reply[1].(string),
+ Count: int(reply[2].(int64)),
+ }, nil
+ case "message":
+ return &Message{
+ Channel: reply[1].(string),
+ Payload: reply[2].(string),
+ }, nil
+ case "pmessage":
+ return &PMessage{
+ Pattern: reply[1].(string),
+ Channel: reply[2].(string),
+ Payload: reply[3].(string),
+ }, nil
+ }
+ return nil, fmt.Errorf("redis: unsupported message name: %q", msgName)
+}
+
+func (c *PubSub) subscribe(cmd string, channels ...string) error {
+ cn, err := c.conn()
+ if err != nil {
+ return err
+ }
+
+ args := append([]string{cmd}, channels...)
+ req := NewSliceCmd(args...)
+ return c.writeCmd(cn, req)
+}
+
+func (c *PubSub) Subscribe(channels ...string) error {
+ return c.subscribe("SUBSCRIBE", channels...)
+}
+
+func (c *PubSub) PSubscribe(patterns ...string) error {
+ return c.subscribe("PSUBSCRIBE", patterns...)
+}
+
+func (c *PubSub) unsubscribe(cmd string, channels ...string) error {
+ cn, err := c.conn()
+ if err != nil {
+ return err
+ }
+
+ args := append([]string{cmd}, channels...)
+ req := NewSliceCmd(args...)
+ return c.writeCmd(cn, req)
+}
+
+func (c *PubSub) Unsubscribe(channels ...string) error {
+ return c.unsubscribe("UNSUBSCRIBE", channels...)
+}
+
+func (c *PubSub) PUnsubscribe(patterns ...string) error {
+ return c.unsubscribe("PUNSUBSCRIBE", patterns...)
+}