超详细的RabbitMQ快速入门!!你不拿走吗?

2021-08-04 16:54:00 1179 技术小虫有点萌

什么是mq

MQ,全称是Message Queue,是基于数据结构中“先进先出”的一种数据结构,指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

mq的使用场景

在技术小虫的工作中,在以下场景中用到过

  • 每天对数据的统计,需要发送很多封邮件,但是我们不想因为把发送邮件这个功能嵌套在我们的统计脚本中(一是为了提高统计效率,二是为了业务的解耦)
  • 使用rabbitmq的死信队列完成对库存、题库的回收工作,比如 某个商品被下单了减了库存,但是迟迟没有付款,超过30分钟我们就默认订单取消,并恢复库存。或者在医生抢题 答题的业务中,有的医生抢了10道题,但是只在有效期(比如1天)内答了3道题,但是剩余的7道题不可能一直被这个医生绑定,超过1天就要被解绑回库。
  • 数据信息的同步,随着微服务的盛行,就会存在对于不同的业务会有不同的用户表,比如,公司原本就有一个用户表的总表。但是随着业务的发展,购车贷是一个服务,他会有自己的用户表,而消费贷也是一个服务,也有自己的用户表。而每个服务的用户信息发生变化,其实都要同步到用户表总表中去。那么这个时候我们就用到了消息队列,每当用户修改信息的时候,都往消息队列投放一个消息。我们通过再这个队列完成信息的同步。
  • 还有一些情景会触发很多操作,那么这个时候也会用到消息队列,比如 医生注册功能,但是在医生完成注册的时候,我们要同步给他开通其他账号,比如医疗科普号,科普讲坛号这些账号。
  • 还有一些高并发的场景,比如一个抢购的活动,年底的时候我们会有积分兑换的活动,一共两个小时,那么这个时候打过来的请求非常多,如果每个都取请求来了我们都取更新一下数据库,做一些更新的操作,可能牵涉的很多个表,那么数据库可能就扛不住。那么为了避免这种情况,我们就可以先把这些数据存起来,让数据库慢慢去消化。这就是流量削峰。

mq有哪些产品和对比

image.png

为什么是rabbitmq

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。 RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。 RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式。

安装启动(ubuntu 18) 参考文章

#先查看一下我的版本号
root@guofu:~# cat /etc/issue
Ubuntu 18.04.5 LTS \n \l

#从前面的mq对比中已经说了,rabbitmq是erlang实现的,所以需要安装erlang
   26  sudo apt-get install erlang-nox
# 添加公钥
   27  wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
# 更新软件包
   28  sudo apt-get update
# 安装rabbitmq ,安装完毕自动启动
   29  sudo apt-get install rabbitmq-server
# 查看rabbitmq的运行状态  service rabbitmq-server status  也可以查看
   30  systemctl status rabbitmq-server
   info:Active: active (running) since Mon 2021-07-26 11:15:54 CST; 13s ago

#服务的启动、停止、重启
   31  sudo service rabbitmq-server stop
   32  sudo service rabbitmq-server start
   33  sudo service rabbitmq-server 

# 安装可视化的web操作页面
   34  sudo rabbitmq-plugins enable rabbitmq_management
   35  sudo service rabbitmq-server restart
   36  curl  http://localhost:15672

至此,rabbitmq安装完毕,web页面也可以访问了。默认用户名和密码是guest/guest,但是,rabbitmq默认会创建guest用户,但是只能服务器本机登录,建议创建其他新用户,授权,用来做其他操作。所以我们接下来开始创建一个新的用户

# 查看所有用户
   38  sudo rabbitmqctl list_users
#增加用户admin 密码是passwd(根据需求自定义即可)
   39  sudo rabbitmqctl add_user admin  passwd
# 给普通用户分配管理员角色
   40  sudo rabbitmqctl set_user_tags admin administrator
#赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源,也是添加远程访问权限
   41  sudo rabbitmqctl  set_permissions -p / admin '.*' '.*' '.*'

使用admin远程登录

配置文件解读

rabbitmq-env.conf rabbitmq的环境变量

root@guofu:~# cd /etc/rabbitmq/
root@guofu:/etc/rabbitmq# ls
enabled_plugins  rabbitmq-env.conf
root@guofu:/etc/rabbitmq# cat rabbitmq-env.conf
# Defaults to rabbit. This can be useful if you want to run more than one node
# per machine - RABBITMQ_NODENAME should be unique per erlang-node-and-machine
# combination. See the clustering on a single machine guide for details:
# http://www.rabbitmq.com/clustering.html#single-machine
#NODENAME=rabbit  --节点名称,如果服务是集群的形式,每个节点的名称必须唯一

# By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
# available. Set this if you only want to bind to one network interface or#
# address family.
#NODE_IP_ADDRESS=127.0.0.1 --节点的ip地址

# Defaults to 5672.
#NODE_PORT=5672 --节点的端口号

# Default rabbitmq-server wait timeout.

mq服务器的架构

相关参考 相关参考

  • 我们先来看一下rabbitmq的架构图
image.png
image.png
  • Broker : 标识消息队列服务器实体rabbitmq-server
  • v-host : Virtual Host 虚拟主机。vhost是rabbitmq分配权限的最小细粒度,比如,我有两个用户a和b,我如果想让a用户只访问a1队列,b用户访问b1队列,那么在同一个vhost下,这是做不到的。 我们可以为一个用户分配一个可以访问哪个或者哪一些vhost的权限。但是不能为用户分配一个可以访问哪一些exchange,或者queue的权限,因为rabbitmq的权限细粒度没有细化到交换器和队列,他的最小细粒度是vhost(vhost中包含许多的exchanges,queues,bingdings)。 所以如果exchangeA 和queueA 只能让用户A访问,exchangeB 和queueB 只能让用户B访问,要达到这种需求,只能为exchangeA 和queueA创建一个vhostA,为exchangeB 和queueB 创建vhostB,这样就隔离开来了。。vhost是AMQP概念的基础,必须在链接时指定。 RabbitMQ默认的vhost是 /。查看所有虚拟主机的命令是
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost test_vhost
Creating vhost "test_vhost"
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
/
test_vhost

# 查看用户列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
admin   [administrator]
guest   [administrator]

# 分配访问权限  set_permissions [-p <vhost>] <user> <conf> <write> <read>
# 需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。
root@guofu:/etc/rabbitmq# set rabbitmqctl set_permissions -p test_host  admin ".*" ".*" ".*"


  • exchange:交换器用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

从web页面可以看到,exchange可以选择的有四种,持久化方式有两种,一种是内存,一种是硬盘

  • fanout / (Publish/Subscribe) / 发布订阅
image

生产者将消息给交换机,交换机根据自身的类型(fanout)将会把所有消息复制同步到所有与其绑定的队列,每个队列可以有一个消费者接收消息进行消费逻辑。需要我们自己创建交换器并进行绑定,创建多个队列进行绑定即可,若一个消费者绑定多个队列则进行轮询,因为mq有阅后即焚的特点,只能保证一个消费者阅读接受。常用于群发消息。

  • 路由模式 / Routing / direct
image

生产者将消息发送到交换机信息携带具体的路由key,交换机的类型是direct,将接收到的信息中的routingKey,比对与之绑定的队列routingkey。消费者监听一个队列,获取消息,执行消费逻辑。一个队列可以绑定一个routingKey也可以绑定多个。在消息进行路由时会携带一个routingKey寻找对应的队列。

  • Topic/ 通配符匹配
image

生产者发送消息,消息中带有具体的路由key,交换机的类型是topic,队列绑定交换机不在使用具体的路由key而是一个范围值,例如: .yell.,hlll.iii,jjj.#。其中* 表示一个字符串(不能携带特殊字符)#表示任意

  • header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。举栗说明
队列A:绑定交换机参数是:format=pdf,type=report,x-match=all,
队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any,
队列C:绑定交换机参数是:format=zip,type=report,x-match=all,

消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A
消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B
消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃

all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机

  • queen:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • Banding : 绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。### Virtual Host的使用

  • Channel : 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

  • Connection : 网络连接,比如一个TCP连接。

接下来我们根据上面的exchang的不同类型做一个演示
  • 先来创建用户和vhost(这里为了演示,会尽可能多的使用到前面讲的命令,具体要根据需求 是否创建vhost),另外这些操作通过web页面也可以完成。
# 创建vhost
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost guofu_vhost
Creating vhost "guofu_vhost"
#查看vhost列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
guofu_vhost
/
test_vhost
# 创建用户和密码
root@guofu:/etc/rabbitmq# rabbitmqctl add_user guofu guofu
Creating user "guofu"
#查看用户列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
vhost1  []
admin   [administrator]
guofu   []
guest   [administrator]
# 给用户设置角色,否则远程登录不了
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_user_tags guofu administrator
Setting tags for user "guofu" to [administrator]
#给用户 vhost的权限,3个* 代表 配置 读 写的权限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_permissions -p guofu_vhost guofu ".*" ".*" ".*"
Setting permissions for user "guofu" in vhost "guofu_vhost"
# 查看用户权限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl list_user_permissions guofu
Listing permissions for user "guofu"
guofu_vhost     .*      .*      .*

配置完毕后,我们在页面也可以看到,已经生效了

  • 新建一个交换机并指定vhost

  • 新建两个队列并绑定exchange

  • 我们把信息配置到代码中去相关参考资料

package main

import (
 "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange类型-生产,fanout     为了方便演示,忽略错误捕捉
 */
func main() {
 //交换机
 var exchange="guofu_exchange"
 //建立连接  用户名+密码+ip+端口号+vhost
 conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
 //建立通道
 ch, _ := conn.Channel()
 //声明交换机类型
 ch.ExchangeDeclare(
  exchange,
  "fanout",
  true,
  false,
  false,
  false,
  nil,
  )

 //定义消息
 msgBody:="i am a msg3"
 //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
 err := ch.Publish(
  exchange,     //exchange
  "", //routing key(queue name)
  false,
  false,
  amqp.Publishing{
   DeliveryMode: amqp.Persistent, //Msg set as persistent
   ContentType:  "text/plain",
   Body:         []byte(msgBody),
  })

 if err !=nil{
  panic(err)
 }
}


  • 我们通过web页面看一下

  • 可见对于fanout 发布订阅 ,其实我们在推送消息的时候,只用到了exchange和type,而不关系队列,因为只要是绑定了该exchange的队列,都会被推送消息。也就是说,fanout模式,一个消息会被推送到多个队列,那么哪种情景会用到这种模式呢?比如 用户注册后,我既要发邮件,又要发短信,那么发短信和发邮件,就可以用fanout 这种模式

  • 下面我写一下消费的代码,消费队列的方法其实都一样,这里演示一次,后面的其他类型的exchange就不演示了。

package main

import (
 "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange类型-生产,fanout     为了方便演示,忽略错误捕捉
 */
func main() {
 //交换机
 var exchange="guofu_exchange"
 //建立连接  用户名+密码+ip+端口号+vhost
 conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
 //建立通道
 ch, _ := conn.Channel()
 //声明交换机类型
 ch.ExchangeDeclare(
  exchange,
  "fanout",
  true,
  false,
  false,
  false,
  nil,
  )

 //定义消息
 msgBody:="i am a msg3"
 //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
 err := ch.Publish(
  exchange,     //exchange
  "", //routing key(queue name)
  false,
  false,
  amqp.Publishing{
   DeliveryMode: amqp.Persistent, //Msg set as persistent
   ContentType:  "text/plain",
   Body:         []byte(msgBody),
  })

 if err !=nil{
  panic(err)
 }
}

  • 上面的代码相信大家都看的明白,但是要注意的是,里面有一个点 【试探性创建】 这是什么意思?这是说,如果有这个exchange/queue,就用,没有的话就创建,刚才我并没有创建guofu_queue3,但是我监听这个队列也得到消息了

  • 那么我们用消费代码创建一下新的exchange和queue

package main

import (
 "fmt"
 "github.com/streadway/amqp"
)

func main() {
 //交换机
 var exchange = "guofu_exchange_test"
 var queue = "guofu_queue_test"
 var key = ""
 //建立连接  用户名+密码+ip+端口号+vhost
 conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
 //建立通道
 ch, _ := conn.Channel()
 //试探性声明交换机类型
 ch.ExchangeDeclare(
  exchange,
  "fanout",
  true,
  false,
  false,
  false,
  nil,
 )

 //试探性创建队列
 //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
 _, err := ch.QueueDeclare(
  queue,
  true,
  false,
  false,
  false,
  nil,
 )
 if err != nil {
  panic(err)
 }
 //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
 ch.QueueBind(queue, key, exchange, false, nil)

 // 消费队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
 msg, err := ch.Consume(
  queue,
  "",
  false,
  false,
  false,
  false,
  nil,
 )
 for d:=range msg{
  fmt.Println(string(d.Body))
  d.Ack(false)

 }




}

  • 交换机和队列被创建

  • 还有一点要注意的是ACK 机制 确认机制分为三种:none、auto (默认)、manual 自动 ACK:消息一旦被接收,消费者自动发送 ACK 手动 ACK:消息接收后,不会发送 ACK,需要手动调用 这两 ACK 要怎么选择呢?这需要看消息的重要性: 如果消息不太重要,丢失也没有影响,那么自动 ACK 会比较方便 如果消息非常重要,不容丢失。那么最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,RabbitMQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

image.png
  • 另外一点是在php和java中,还有一种生产者消息确认机制,消息推送成功后支持函数回调,但是golang里面我没有找到这个方法

  • 好了,我们回归exchange的第二种类型direct 路由模式,这次我们直接使用消费端的代码直接建立队列并监听

package main

import (
 "fmt"
 "github.com/streadway/amqp"
)

func main() {
 //交换机
 var exchange = "direct_guofu_exchange"
 var queue = "direct_guofu_queue"
 var key = "direct_key"
 //建立连接  用户名+密码+ip+端口号+vhost
 conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
 //建立通道
 ch, _ := conn.Channel()
 //试探性声明交换机类型
 ch.ExchangeDeclare(
  exchange,
  "direct",
  true,
  false,
  false,
  false,
  nil,
 )

 //试探性创建队列
 //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
 _, err := ch.QueueDeclare(
  queue,
  true,
  false,
  false,
  false,
  nil,
 )
 if err != nil {
  panic(err)
 }
 //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
 ch.QueueBind(queue, key, exchange, false, nil)

 // 消费队列  Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
 msg, err := ch.Consume(
  queue,
  "",
  false,
  false,
  false,
  false,
  nil,
 )
 for d:=range msg{
  fmt.Println(string(d.Body))
  d.Ack(false)

 }




}


image.png
  • 使用同样方法 创建队列direct_guofu_queue

  • 推送消息到该队列,需要注意的是,如果你两个queue使用了同一个key,那么exchange会根据key 推送给两个队列,如果不是业务需要,尽量避免重复key ,减少脏数据的生成

package main

import (
 "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange类型-生产,fanout     为了方便演示,忽略错误捕捉
 */
func main() {
 var exchange = "direct_guofu_exchange"
 var key = "direct_key"
 //建立连接  用户名+密码+ip+端口号+vhost
 conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
 //建立通道
 ch, _ := conn.Channel()
 //声明交换机类型
 ch.ExchangeDeclare(
  exchange,
  "direct",
  true,
  false,
  false,
  false,
  nil,
  )

 //定义消息
 msgBody:="i am a direct"
 //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
 err := ch.Publish(
  exchange,     //exchange
  key, //routing key(queue name)
  false,
  false,
  amqp.Publishing{
   DeliveryMode: amqp.Persistent, //Msg set as persistent
   ContentType:  "text/plain",
   Body:         []byte(msgBody),
  })


 if err !=nil{
  panic(err)
 }
}

image.png
  • topic模式 topic 类似于mysql的模糊查询,只要是能模糊匹配到的,都会推送消息。,推送的路由可以是一个包含了多个属性,以.分割的字符串,最大程长度是200左右。推送之后,其他会匹配其他队列的路由,如果匹配到了,则推送进去。现在我们假设有以下场景 一共有两个队列,第一个队列animal 如果是动物,进入这个队列,第二个队列是 plant,第三个队列是表示颜色yellow,如果是黄色的都进入这个队列,现在我们要推送这个几个到队列里面去 1.橘猫 既要去animal 也要去 yellow 2.菊花 既要去plant 也要去yellow

如代码所示,我创建了三个队列,绑定的key 分别是 #.animal.#,#.plant.#,yellow.#,

 var exchange = "topic_guofu_exchange"
 var queue = "topic727_yellow"
 var key = "yellow.#"

 var exchange = "topic_guofu_exchange"
 var queue = "topic727_animal"
 var key = "#.animal.#"


 var exchange = "topic_guofu_exchange"
 var queue = "topic727_plant"
 var key = "#.plant.#"

那么当我推送消息的时候,如果我topic绑定的路由键 是 yellow.animal.plant ,那么推送的时候 三个消息队列都会被匹配。我们来看一下

  • 把生产的代码贴出来
package main

import (
 "github.com/streadway/amqp"
)

/**
 * @Description: 演示rabbitmq的exchange类型-生产,fanout     为了方便演示,忽略错误捕捉
 */
func main() {
 var exchange = "topic_guofu_exchange"
 var key = "yellow.animal.plant "
 var queue = "topic727"
 //建立连接  用户名+密码+ip+端口号+vhost
 conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
 //建立通道
 ch, _ := conn.Channel()
 //声明交换机类型
 ch.ExchangeDeclare(
  exchange,
  "topic",
  true,
  false,
  false,
  false,
  nil,
 )

 //试探性创建队列
 //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
 _, err := ch.QueueDeclare(
  queue,
  true,
  false,
  false,
  false,
  nil,
 )
 if err != nil {
  panic(err)
 }
 //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
 ch.QueueBind(queue, key, exchange, false, nil)

 //定义消息
 msgBody := key
 //发送消息  相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
 err = ch.Publish(
  exchange, //exchange
  key,      //routing key(queue name)
  false,
  false,
  amqp.Publishing{
   DeliveryMode: amqp.Persistent, //Msg set as persistent
   ContentType:  "text/plain",
   Body:         []byte(msgBody),
  })

 if err != nil {
  panic(err)
 }
}

  • 推送完毕,发现四个队列都有了数据(第一个队列是topic 推送时候绑定的,后面三个是路由匹配的)

  • 那么此时,如果我推送的key是yellow.animal,那么路由会匹配到 yellow.# 和 #.animal.#,我们来看一下

  • topic的功能是比较强大的,利用好topic ,可以实现 direct和fanout的功能,路由密钥中可以包含任意多个单词,最多255个字节。

  • header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。在此也不做赘述了。有兴趣的同学可以去官网看看。