MQ,全称是Message Queue,是基于数据结构中“先进先出”的一种数据结构,指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
在技术小虫的工作中,在以下场景中用到过
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。 RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。 RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式。
#先查看一下我的版本号
root@guofu:~# cat /etc/issue
Ubuntu 18.04.5 LTS \n \l
#从前面的mq对比中已经说了,rabbitmq是erlang实现的,所以需要安装erlang
26 sudo apt-get install erlang-nox
# 添加公钥
27 wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
# 更新软件包
28 sudo apt-get update
# 安装rabbitmq ,安装完毕自动启动
29 sudo apt-get install rabbitmq-server
# 查看rabbitmq的运行状态 service rabbitmq-server status 也可以查看
30 systemctl status rabbitmq-server
info:Active: active (running) since Mon 2021-07-26 11:15:54 CST; 13s ago
#服务的启动、停止、重启
31 sudo service rabbitmq-server stop
32 sudo service rabbitmq-server start
33 sudo service rabbitmq-server
# 安装可视化的web操作页面
34 sudo rabbitmq-plugins enable rabbitmq_management
35 sudo service rabbitmq-server restart
36 curl http://localhost:15672
至此,rabbitmq安装完毕,web页面也可以访问了。默认用户名和密码是guest/guest,但是,rabbitmq默认会创建guest用户,但是只能服务器本机登录,建议创建其他新用户,授权,用来做其他操作。所以我们接下来开始创建一个新的用户
# 查看所有用户
38 sudo rabbitmqctl list_users
#增加用户admin 密码是passwd(根据需求自定义即可)
39 sudo rabbitmqctl add_user admin passwd
# 给普通用户分配管理员角色
40 sudo rabbitmqctl set_user_tags admin administrator
#赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源,也是添加远程访问权限
41 sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
使用admin远程登录
rabbitmq-env.conf rabbitmq的环境变量
root@guofu:~# cd /etc/rabbitmq/
root@guofu:/etc/rabbitmq# ls
enabled_plugins rabbitmq-env.conf
root@guofu:/etc/rabbitmq# cat rabbitmq-env.conf
# Defaults to rabbit. This can be useful if you want to run more than one node
# per machine - RABBITMQ_NODENAME should be unique per erlang-node-and-machine
# combination. See the clustering on a single machine guide for details:
# http://www.rabbitmq.com/clustering.html#single-machine
#NODENAME=rabbit --节点名称,如果服务是集群的形式,每个节点的名称必须唯一
# By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if
# available. Set this if you only want to bind to one network interface or#
# address family.
#NODE_IP_ADDRESS=127.0.0.1 --节点的ip地址
# Defaults to 5672.
#NODE_PORT=5672 --节点的端口号
# Default rabbitmq-server wait timeout.
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost test_vhost
Creating vhost "test_vhost"
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
/
test_vhost
# 查看用户列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
admin [administrator]
guest [administrator]
# 分配访问权限 set_permissions [-p <vhost>] <user> <conf> <write> <read>
# 需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。
root@guofu:/etc/rabbitmq# set rabbitmqctl set_permissions -p test_host admin ".*" ".*" ".*"
生产者将消息给交换机,交换机根据自身的类型(fanout)将会把所有消息复制同步到所有与其绑定的队列,每个队列可以有一个消费者接收消息进行消费逻辑。需要我们自己创建交换器并进行绑定,创建多个队列进行绑定即可,若一个消费者绑定多个队列则进行轮询,因为mq有阅后即焚的特点,只能保证一个消费者阅读接受。常用于群发消息。
生产者将消息发送到交换机信息携带具体的路由key,交换机的类型是direct,将接收到的信息中的routingKey,比对与之绑定的队列routingkey。消费者监听一个队列,获取消息,执行消费逻辑。一个队列可以绑定一个routingKey也可以绑定多个。在消息进行路由时会携带一个routingKey寻找对应的队列。
生产者发送消息,消息中带有具体的路由key,交换机的类型是topic,队列绑定交换机不在使用具体的路由key而是一个范围值,例如: .yell.,hlll.iii,jjj.#。其中* 表示一个字符串(不能携带特殊字符)#表示任意
队列A:绑定交换机参数是:format=pdf,type=report,x-match=all,
队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any,
队列C:绑定交换机参数是:format=zip,type=report,x-match=all,
消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A
消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B
消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃
all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
Banding : 绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。### Virtual Host的使用
Channel : 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
Connection : 网络连接,比如一个TCP连接。
# 创建vhost
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost guofu_vhost
Creating vhost "guofu_vhost"
#查看vhost列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts
Listing vhosts
guofu_vhost
/
test_vhost
# 创建用户和密码
root@guofu:/etc/rabbitmq# rabbitmqctl add_user guofu guofu
Creating user "guofu"
#查看用户列表
root@guofu:/etc/rabbitmq# rabbitmqctl list_users
Listing users
vhost1 []
admin [administrator]
guofu []
guest [administrator]
# 给用户设置角色,否则远程登录不了
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_user_tags guofu administrator
Setting tags for user "guofu" to [administrator]
#给用户 vhost的权限,3个* 代表 配置 读 写的权限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_permissions -p guofu_vhost guofu ".*" ".*" ".*"
Setting permissions for user "guofu" in vhost "guofu_vhost"
# 查看用户权限
root@guofu:/etc/rabbitmq# sudo rabbitmqctl list_user_permissions guofu
Listing permissions for user "guofu"
guofu_vhost .* .* .*
配置完毕后,我们在页面也可以看到,已经生效了
新建一个交换机并指定vhost
新建两个队列并绑定exchange
我们把信息配置到代码中去相关参考资料
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示rabbitmq的exchange类型-生产,fanout 为了方便演示,忽略错误捕捉
*/
func main() {
//交换机
var exchange="guofu_exchange"
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//声明交换机类型
ch.ExchangeDeclare(
exchange,
"fanout",
true,
false,
false,
false,
nil,
)
//定义消息
msgBody:="i am a msg3"
//发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
err := ch.Publish(
exchange, //exchange
"", //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err !=nil{
panic(err)
}
}
我们通过web页面看一下
可见对于fanout 发布订阅 ,其实我们在推送消息的时候,只用到了exchange和type,而不关系队列,因为只要是绑定了该exchange的队列,都会被推送消息。也就是说,fanout模式,一个消息会被推送到多个队列,那么哪种情景会用到这种模式呢?比如 用户注册后,我既要发邮件,又要发短信,那么发短信和发邮件,就可以用fanout 这种模式
下面我写一下消费的代码,消费队列的方法其实都一样,这里演示一次,后面的其他类型的exchange就不演示了。
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示rabbitmq的exchange类型-生产,fanout 为了方便演示,忽略错误捕捉
*/
func main() {
//交换机
var exchange="guofu_exchange"
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//声明交换机类型
ch.ExchangeDeclare(
exchange,
"fanout",
true,
false,
false,
false,
nil,
)
//定义消息
msgBody:="i am a msg3"
//发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
err := ch.Publish(
exchange, //exchange
"", //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err !=nil{
panic(err)
}
}
上面的代码相信大家都看的明白,但是要注意的是,里面有一个点 【试探性创建】 这是什么意思?这是说,如果有这个exchange/queue,就用,没有的话就创建,刚才我并没有创建guofu_queue3,但是我监听这个队列也得到消息了
那么我们用消费代码创建一下新的exchange和queue
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
//交换机
var exchange = "guofu_exchange_test"
var queue = "guofu_queue_test"
var key = ""
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//试探性声明交换机类型
ch.ExchangeDeclare(
exchange,
"fanout",
true,
false,
false,
false,
nil,
)
//试探性创建队列
//声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
_, err := ch.QueueDeclare(
queue,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
//绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
ch.QueueBind(queue, key, exchange, false, nil)
// 消费队列 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
msg, err := ch.Consume(
queue,
"",
false,
false,
false,
false,
nil,
)
for d:=range msg{
fmt.Println(string(d.Body))
d.Ack(false)
}
}
交换机和队列被创建
还有一点要注意的是ACK 机制 确认机制分为三种:none、auto (默认)、manual 自动 ACK:消息一旦被接收,消费者自动发送 ACK 手动 ACK:消息接收后,不会发送 ACK,需要手动调用 这两 ACK 要怎么选择呢?这需要看消息的重要性: 如果消息不太重要,丢失也没有影响,那么自动 ACK 会比较方便 如果消息非常重要,不容丢失。那么最好在消费完成后手动 ACK,否则接收消息后就自动 ACK,RabbitMQ 就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
另外一点是在php和java中,还有一种生产者消息确认机制,消息推送成功后支持函数回调,但是golang里面我没有找到这个方法
好了,我们回归exchange的第二种类型direct 路由模式,这次我们直接使用消费端的代码直接建立队列并监听
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
//交换机
var exchange = "direct_guofu_exchange"
var queue = "direct_guofu_queue"
var key = "direct_key"
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//试探性声明交换机类型
ch.ExchangeDeclare(
exchange,
"direct",
true,
false,
false,
false,
nil,
)
//试探性创建队列
//声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
_, err := ch.QueueDeclare(
queue,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
//绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
ch.QueueBind(queue, key, exchange, false, nil)
// 消费队列 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table)
msg, err := ch.Consume(
queue,
"",
false,
false,
false,
false,
nil,
)
for d:=range msg{
fmt.Println(string(d.Body))
d.Ack(false)
}
}
使用同样方法 创建队列direct_guofu_queue
推送消息到该队列,需要注意的是,如果你两个queue使用了同一个key,那么exchange会根据key 推送给两个队列,如果不是业务需要,尽量避免重复key ,减少脏数据的生成
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示rabbitmq的exchange类型-生产,fanout 为了方便演示,忽略错误捕捉
*/
func main() {
var exchange = "direct_guofu_exchange"
var key = "direct_key"
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//声明交换机类型
ch.ExchangeDeclare(
exchange,
"direct",
true,
false,
false,
false,
nil,
)
//定义消息
msgBody:="i am a direct"
//发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
err := ch.Publish(
exchange, //exchange
key, //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err !=nil{
panic(err)
}
}
如代码所示,我创建了三个队列,绑定的key 分别是 #.animal.#,#.plant.#,yellow.#,
var exchange = "topic_guofu_exchange"
var queue = "topic727_yellow"
var key = "yellow.#"
var exchange = "topic_guofu_exchange"
var queue = "topic727_animal"
var key = "#.animal.#"
var exchange = "topic_guofu_exchange"
var queue = "topic727_plant"
var key = "#.plant.#"
那么当我推送消息的时候,如果我topic绑定的路由键 是 yellow.animal.plant ,那么推送的时候 三个消息队列都会被匹配。我们来看一下
package main
import (
"github.com/streadway/amqp"
)
/**
* @Description: 演示rabbitmq的exchange类型-生产,fanout 为了方便演示,忽略错误捕捉
*/
func main() {
var exchange = "topic_guofu_exchange"
var key = "yellow.animal.plant "
var queue = "topic727"
//建立连接 用户名+密码+ip+端口号+vhost
conn, _ := amqp.Dial("amqp://guofu:guofu@172.16.131.3:5672/guofu_vhost")
//建立通道
ch, _ := conn.Channel()
//声明交换机类型
ch.ExchangeDeclare(
exchange,
"topic",
true,
false,
false,
false,
nil,
)
//试探性创建队列
//声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table
_, err := ch.QueueDeclare(
queue,
true,
false,
false,
false,
nil,
)
if err != nil {
panic(err)
}
//绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空
ch.QueueBind(queue, key, exchange, false, nil)
//定义消息
msgBody := key
//发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing
err = ch.Publish(
exchange, //exchange
key, //routing key(queue name)
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, //Msg set as persistent
ContentType: "text/plain",
Body: []byte(msgBody),
})
if err != nil {
panic(err)
}
}
推送完毕,发现四个队列都有了数据(第一个队列是topic 推送时候绑定的,后面三个是路由匹配的)
那么此时,如果我推送的key是yellow.animal,那么路由会匹配到 yellow.# 和 #.animal.#,我们来看一下
topic的功能是比较强大的,利用好topic ,可以实现 direct和fanout的功能,路由密钥中可以包含任意多个单词,最多255个字节。
header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。在此也不做赘述了。有兴趣的同学可以去官网看看。