登录
转载

一文带你理解最简消息队列实现

专栏golang大全
发布于 2025-01-27 阅读 108
  • 后端
  • Go
  • 消息队列
转载

最近在看公司的 redis queue 时,发现底层使用的是 go-zeroqueue 。本篇文章来看看 queue 的设计,也希望可以从里面了解到 mq 的最小型设计实践

使用

结合其他 mq 的使用经历,基本的使用流程:

  1. 创建 producerconsumer
  2. 启动 mq
  3. 生产消息/消费消息

对应到 queue 中,大致也是这个:

创建 queue

// 生产者创建工厂
producer := newMockedProducer()
// 消费者创建工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创建工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

我们看看 NewQueue 需要什么构建条件:

  1. producer constructor
  2. consumer constructor

将双方的工厂函数传递给 queue ,由它去执行以及重试。

这两个需要的目的是将生产者/消费者的构建和消息生产/消费都封装在 mq 中,而且将生产者/消费者的整套逻辑交给开发者处理:

type (
    // 开发者需要实现此接口
    Producer interface {
        AddListener(listener ProduceListener)
        Produce() (string, bool)
    }
    ...
    // ProducerFactory定义了生成Producer的方法
    ProducerFactory func() (Producer, error)
)
  1. 其实也就是将生产者的逻辑交个开发者自己完成,mq 只负责生产者/消费者的消息传递和之间的调度。
  2. 工厂方法的设计,是将生产者本身和生产消息,这两个任务都交给 queue 自己来做调度或者重试。

生产 message

生产消息当然要回到生产者本身:

type mockedProducer struct {
    total int32
    count int32
    // 使用waitgroup来模拟任务的完成
    wait  sync.WaitGroup
}

// 实现 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
if atomic.AddInt32(&p.count, 1) <= p.total {
p.wait.Done()
return “item”, true
}

<span class="nx">time</span><span class="p">.</span><span class="nf">Sleep</span><span class="p">(</span><span class="nx">time</span><span class="p">.</span><span class="nx">Second</span><span class="p">)</span>
<span class="k">return</span> <span class="s">""</span><span class="p">,</span> <span class="kc">false</span>

}

queue 中的生产者编写都必须实现:

  • Produce():由开发者编写生产消息的逻辑
  • AddListener():添加事件侦听器

消费 message

和生产者类似:

type mockedConsumer struct {
count int32
}

func (c mockedConsumer) Consume(string) error {
atomic.AddInt32(&amp;c.count, 1)
return nil
}

启动 queue

启动,然后验证我们上述的生产者和消费者之间的数据是否传输成功:

func TestQueue(t testing.T) {
producer := newMockedProducer(rounds)
consumer := newMockedConsumer()
// 创建 queue
q := NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
// 当生产者生产完毕,执行 Stop() 关闭生产端生产
go func() {
producer.wait.Wait()
// mq生产端停止生产,不是mq本身 Stop 运行
q.Stop()
}()
// 启动
q.Start()
// 验证生产消费端是否消息消费完成
assert.Equal(t, int32(rounds), atomic.LoadInt32(&amp;consumer.count))
}

以上就是 queue 最简易的入门使用代码。开发者可以根据自己的业务实际情况:自由定义生产者/消费者已经生产/消费逻辑。

整体设计

整体流程如上图:

  1. 全体的通信都由 channel 进行
  2. 通过加入监听器 listener ,以及事件触发 event ,相当于将触发器逻辑分离出来
  3. 生产者有 produceone ,这个是生产消息的逻辑,但是其中的 Produce() 是由开发者编写【上面的 interface 中正是这个函数】
  4. 同理消费者,Consume()

基本的消息流动就入上图以及上述描写的,具体的代码分析我们就留到下一篇,我们 分析里面,尤其是如何控制 channel 是整个设计的核心。

总结

本篇文章从使用以及整个架构分析上简略介绍了 queue 的设计。下篇我们将深入源码,分析内部消息流转以及 channel 控制。

关于 go-zero 更多的设计和实现文章,可以持续关注我们。

项目地址

github.com/tal-tech/go-

欢迎使用 go-zero 并 star 支持我们!

评论区

励志做一条安静的咸鱼,从此走上人生巅峰。

0

0

4

举报