坑爹的RabbitMQ

  RabbitMQ本身是个好东西:其可靠性好不容易丢消息,性能也不算差,准照AMQP协议,而且历史悠久算是被实践检验过,但是说他坑爹,主要是这个中间件使用非主流的Erlang语言开发,相比其他语言都还好,而唯独对C/C++没有那种官方完全支持、简单好用的客户端开发库可用。现有来说C/C++的开发库只有alanxz维护的librabbitmq-c,这个库虽然目前被收录在EPEL库,但是没有Apache、GNU这类大组织或者公司维护的话总让人心里有些忐忑和不安,而且这个库的接口比较的底层,如果用户对AMQP没有一些了解的话很难开箱即用 ;然后在GitHub上面搜索其他的C++客户端库,相同作者的SimpleRabbitClient在低版本boost上面用不了,而其他库也基本是librabbitmq-c的再封装,看了下代码质量感觉也是一般般。
  其实这几天折腾看来,AMQP的协议也不是很复杂,但是要从头写一个C++客户端库还是有点折腾。既然大家都用rabbitmq-c,就假定其被实践检验了稳定了(话说作者还是挺热心的,有问题提Issue回复很快),于是花了几天对AMQP协议、Python pika、SimpleRabbitClient的东西进行一些深入的梳理,自己模仿做出来了个rabbitmq_cpp_wrapper的对rabbitmq-c库上层封装的轮子。
  这个库的封装主要是使用RabbitMQHelper管理一个connnection,同时管理其中的各个channel,并且利用C++的析构机制自动进行一些资源的释放清理工作,而且对AMQP的Publish Confirm和Consume ACK/Reject机制提供可配置的支持操作。这个封装的作用就是想让客户端的操作尽可能的简单,尤其在发送和接收消息的时候不需要考虑各种异常处理、不需要考虑各种资源的释放等,最好是会用pika的人用起这个封装来也一样顺手就好。
  虽然在AMQP协议中,可以在同一个connection上创建多个channel,实现在一个连接上产生多个逻辑通道进行并行通信的效果,但是如果使用我封装的这个库的话,如果使用环境没有限制(比如连接限制,exclusive限制等)最好是一个connection只用一个channel,而多个线程可以建立自己的connection实例。在rabbitmq-c库中,有些接口是直接返回amqp_rpc_reply_t的,而有些接口则是共享一个connection级别的状态,这也就意味着某些状态会被丢失、会被覆盖;同时多个channel返回的数据帧可能会交错乱序,那么客户端必须为每个channel创建消息的接收队列,在收到不期望的frame时候进行识别和保存,以便对应通道在需要的时候取出来处理;一旦connection出现问题,所有的channel都必须重新设置,显然使用多个connection来实现多个channel的方式更为的可靠。这也让我觉得,虽然HTTP/2好用,但是要让自己开发的话,还是HTTP/1.1更简单可靠些。
  测试的时候,在自己笔记本512M内存的虚拟机下,采用30个线程并发生产消息,2个消费者进程取出消息,吞吐量达18.8k QPS;而在公司较强的多核服务器上,同样的测试吞吐量在25.3k QPS,可能是服务器负载比较大,或者机械硬盘持久化效率不高吧,机器性能高了不少可并没有达到我想象中的性能提升;而在服务器上打开Publish Confirm和Consume ACK的特性之后,吞吐量只有可怜的3.3k QPS。但是我们的业务对数据持久化要求很高,对性能要求倒是一般,所以现有情况下足够我们使用了。
  最后再侃侃MQ,其实这货在互联网公司算是使用十分广泛的组件,在不需要同步响应的需求下,使用MQ机制可以横向扩充系统的处理性能,对响应请求起到削峰填谷的缓冲,同时对服务提供者也提供一些隐藏、保护的功能。我司的一些内部业务采用CGI的服务请求方式,这些服务的最终结果都是通过回调+查询的方式进行更新,所以没有必要使用CGI这种同步的方式提供服务,而这种方式服务弊病很多:业务量一大请求就会排队,请求被长时间阻塞;服务必须随时在线,否则服务请求者请求失败必须不断重试;要命的是修复、更新操作必须到深夜才敢下线重启,苦不堪言。通过引入MQ的方式,服务的发起者和接受者通过MQ中间件隔离开来了,在MQ固化消息的帮助下,服务端可以随时上下线操作,后续服务还将做成无状态形式,在未到达MQ瓶颈之前可以通过增加服务实例增加处理性能,这让我们想想都激动啊……
  还有,提示大家在使用MQ的时候,最好在取消息消费的时候额外添加一个“令牌桶”封装的机制,虽然采用手动确认方式不会导致消费者过载,但是在调试或者突发紧急情况的时候需要手动限制甚至停止消费行为,以防止异常消费行为导致后续的行为不可控。“令牌桶”的限流算法十分简单,消费线程每尝试消费一条消息的时候都需要事先获取一个令牌才行,否则阻止该线程消费消息,同时额外启动一个定时器工具,每规定间隔时刻将桶中的令牌置满即可。刚开始封装的时候忘记了RabbitMQ消息持久化三要素,所以Broker Server重启后所有的消息被丢失,最稳妥的方式还是要针对各种情况做一些深入的测试和验证吧。不过这里使用“令牌通”还有一个坑点,就是在我封装的库中RabbitMQ消费消息是使用推模式的,意味着一旦消息消费完ACK的时候,broker已经将消息推送给客户端了,而此时客户端如果没有token的话,就不能消费这条消息。而如果此时你的消费者线程调度不均衡的话,就有可能导致某些线程实质上持有了一个消息,但是却不断被饿死的情况发生。
  比如我在“令牌桶”中如果消费者没有拿到token,就让他睡眠固定的时间,醒来后再检查,如果没有token则继续睡眠。其实在这个消费者睡眠的时候可能token已经可用了,但是却被别的线程抢走了,等到他醒来之后又发现token没有了,一直这样可能会持续很长时间。造成这种情况一方面是调度的不均衡,没有按照排队的顺序分发token导致部分线程被饿死,还有就是这种模式适合拉消息的模式,而很不适合推消息的模式。

  RabbitMQ使用坑点大全:
  (1). RabbitMQ比较灵活,所以消息持久化(注意持久化检测三要素)、QoS、Publish Confirm、Consume Ack这些参数需要根据业务自行设置和取舍,不过越可靠的情况下也就意味着性能损耗越大。
  (2). 在现实中消息只能至少一次、至多一次这种类型的服务类型,所以业务层要做好消息防重、消息丢失等各种情况的处理,以及相应的业务补偿措施。
  (3). 我们在公司为了高可用考虑,会进行RabbitMQ node集群mirror,需要注意的是默认RabbitMQ新节点加进来只会同步新的消息,旧的消息是不会自动同步的,此时设置policy的时候需要显式参数指明同步,以保证消息的安全。

1
2
rabbitmqctl set_policy ha-all "." '{"ha-mode":"all","ha-sync-mode":"automatic"}' -p vhost
rabbitmqctl list_queues name messages pid slave_pids synchronised_slave_pids -p t0transaction

  (4). 使用RabbitMQ一定要做好消息队列的监控,因为一旦消息堆积严重的话RabbitMQ就会大量的写盘,导致消息的推送效率急剧下降,服务器表现为erlang进程占用大量的CPU和内存,就像数据库过载的表现一样。这时候说明消费者和生产者的阻抗不匹配了,是该考虑增加消费者的数量了。

本文完!