Golang redis(六)redigo 发布订阅模式

2023年2月10日09:58:22

发布订阅

使用Send,Flush和Receive方法实现Pub / Sub

c.Send("SUBSCRIBE", "example")
c.Flush()
for {
    reply, err := c.Receive()
    if err != nil {
        return err
    }
    // process pushed message
}

PubSubConn封装Conn以实现订阅者提供简便方法。Subscribe,PSubscribe,Unsubscribe和PUnsubscribe方法发送和刷新订阅。receive方法将推送的消息转换对应的类型

psc := redis.PubSubConn{Conn: c}
psc.Subscribe("example")
for {
    switch v := psc.Receive().(type) {
    case redis.Message:
        fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
    case redis.Subscription:
        fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
    case error:
        return v
    }
}

PubSubConn

定义

type PubSubConn struct {
    Conn Conn
}

提供的方法:

1.Close 关闭连接
func (c PubSubConn) Close() error

2.PSubscribe PSubscribe发布
func (c PubSubConn) PSubscribe(channel ...interface{}) error

3.PUnsubscribe 取消发布, 如果没有给定, 则取消所有
func (c PubSubConn) PUnsubscribe(channel ...interface{}) error

4.Ping 指定的数据向服务器发送PING 调用此方法时,连接必须至少订阅一个通道或模式
func (c PubSubConn) Ping(data string) error

5.Receive 获取消息
func (c PubSubConn) Receive() interface{}

6.ReceiveWithTimeout 带有超时时间的获取消息函数
func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{}

7.Subscribe 订阅
func (c PubSubConn) Subscribe(channel ...interface{}) error

8.Unsubscribe 取消订阅
func (c PubSubConn) Unsubscribe(channel ...interface{}) error

示例程序:

package main

import (
   "context"
   "fmt"
   "time"

   "github.com/gomodule/redigo/redis"
)

// listenPubSubChannels listens for messages on Redis pubsub channels. The
// onStart function is called after the channels are subscribed. The onMessage
// function is called for each message.
func listenPubSubChannels(ctx context.Context, redisServerAddr string,
   onStart func() error,
   onMessage func(channel string, data []byte) error,
   channels ...string) error {
   // A ping is set to the server with this period to test for the health of
   // the connection and server.
   const healthCheckPeriod = time.Minute

   c, err := redis.Dial("tcp", redisServerAddr,
      // Read timeout on server should be greater than ping period.
      redis.DialReadTimeout(healthCheckPeriod+10*time.Second),
      redis.DialWriteTimeout(10*time.Second))
   if err != nil {
      return err
   }
   defer c.Close()

   psc := redis.PubSubConn{Conn: c}

   if err := psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {
      return err
   }

   done := make(chan error, 1)

   // Start a goroutine to receive notifications from the server.
   go func() {
      for {
         switch n := psc.Receive().(type) {
         case error:
            done <- n
            return
         case redis.Message:
            if err := onMessage(n.Channel, n.Data); err != nil {
               done <- err
               return
            }
         case redis.Subscription:
            switch n.Count {
            case len(channels):
               // Notify application when all channels are subscribed.
               if err := onStart(); err != nil {
                  done <- err
                  return
               }
            case 0:
               // Return from the goroutine when all channels are unsubscribed.
               done <- nil
               return
            }
         }
      }
   }()

   ticker := time.NewTicker(healthCheckPeriod)
   defer ticker.Stop()
loop:
   for err == nil {
      select {
      case <-ticker.C:
         // Send ping to test health of connection and server. If
         // corresponding pong is not received, then receive on the
         // connection will timeout and the receive goroutine will exit.
         if err = psc.Ping(""); err != nil {
            break loop
         }
      case <-ctx.Done():
         break loop
      case err := <-done:
         // Return error from the receive goroutine.
         return err
      }
   }

   // Signal the receiving goroutine to exit by unsubscribing from all channels.
   psc.Unsubscribe()

   // Wait for goroutine to complete.
   return <-done
}

func publish() {
   c, err := redis.Dial("tcp", "127.0.0.1:6379")
   if err != nil {
      fmt.Println(err)
      return
   }
   defer c.Close()

   c.Do("PUBLISH", "c1", "hello")
   c.Do("PUBLISH", "c2", "world")
   c.Do("PUBLISH", "c1", "goodbye")
}

// This example shows how receive pubsub notifications with cancelation and
// health checks.
func main() {
   redisServerAddr := "127.0.0.1:6379"

   ctx, cancel := context.WithCancel(context.Background())

   err := listenPubSubChannels(ctx,
      redisServerAddr,
      func() error {
         // The start callback is a good place to backfill missed
         // notifications. For the purpose of this example, a goroutine is
         // started to send notifications.
         go publish()
         return nil
      },
      func(channel string, message []byte) error {
         fmt.Printf("channel: %s, message: %s\n", channel, message)

         // For the purpose of this example, cancel the listener's context
         // after receiving last message sent by publish().
         if string(message) == "goodbye" {
            cancel()
         }
         return nil
      },
      "c1", "c2")

   if err != nil {
      fmt.Println(err)
      return
   }

}

输出:

channel: c1, message: hello
channel: c2, message: world
channel: c1, message: goodbye

  • 作者:comprel
  • 原文链接:https://blog.csdn.net/comprel/article/details/95921305
    更新时间:2023年2月10日09:58:22 ,共 3839 字。