夫天地者,万物之逆旅;光阴者,百代之过客。而浮生若梦,为欢几何?
RabbitMQ 入门篇之—服务端消息确认机制

为什么需要服务端消息确认?

RabbitMQ提供了服务端的消息持久化,可解决服务器异常奔溃导致的消息丢失。那怎么确保生产者在发出消息后正确的到达broker代理服务器呢?

RabbitMQ默认情况下操作,服务器是不会返回任何信息给生产者的,也就是生产者不知道发出的消息是否正确到达了broker。如果消息在传输中出现错误未能到达服务器,那么服务器做持久化也于事无补。为了解决此问题,RabbitMQ提供了事物机制和Confirm模式。

事物机制

关键代码如下:

 ConnectionFactory factory = new ConnectionFactory();
            factory.UserName = "guest";
            factory.Password = "guest";
            factory.VirtualHost = "/";
            factory.HostName = "127.0.0.1";
            var durable = true;
            var queueName = "TransactionQueue";

            IConnection conn = factory.CreateConnection();
            IModel model = conn.CreateModel();

            model.QueueDeclare(queueName, durable, false, false, null);
            var properties = model.CreateBasicProperties();
            properties.Persistent = true;//设置消息的持久化
            model.TxSelect();
            try
            {
                for (int i = 1; i < 10; i++)
                {
                    byte[] messageBodyBytes = Encoding.UTF8.GetBytes("Hello limitcode :" + i);
                    model.BasicPublish("", queueName, properties, messageBodyBytes);//推送消息
                    Console.WriteLine("已发送 {0} 条消息", i);
                    System.Threading.Thread.Sleep(2000);
                }
                model.TxCommit();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
                model.TxRollback();
            }

关键方法只有3个:使用 TxSelect 开启事务,TxCommit 提交事务,TxRollback回滚事务。

使用此事务方式可以保证消息不丢失,但因为它是同步的方式,事务在服务端和客户端之间需要多次的校验,很耗时,会造成性能瓶颈,所以在对性能有高要求的情况下并不适合。为了解决此问题,RabbitMQ提供了Confirm模式。

Confirm模式

引借大牛原话:生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。  

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。 在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

在代码层面要做到Confirm有三种编码方式:

1、批量Confirm(普通Confirm)

关键代码如下:

 model.ConfirmSelect();// 开启Confirm模式
            try
            {
                for (int i = 1; i < 10; i++)
                {
                    byte[] messageBodyBytes = Encoding.UTF8.GetBytes("Hello limitcode :" + i);
                    model.BasicPublish("", queueName, properties, messageBodyBytes);//推送消息
                    Console.WriteLine("已发送 {0} 条消息", i);
                    System.Threading.Thread.Sleep(2000);
                }
                var isOk = model.WaitForConfirms();// 实际上是一种串行方式
                Console.WriteLine("消息已经送达?{0}", isOk?"是":"否");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

2、异步Confirm

通过事件的形式提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

关键代码如下:

public void Start()
        {
            var durable = true;
            var queueName = "TransactionQueue";
            IModel model = getChannel(durable, queueName);
            model.QueueDeclare(queueName, durable, false, false, null);
            var properties = model.CreateBasicProperties();
            model.ConfirmSelect();// 开启Confirm模式
            model.BasicNacks += Model_BasicNacks;//通过事件形式回调
            model.BasicAcks += Model_BasicAcks;
            for (int i = 1; i < 10; i++)
            {
                byte[] messageBodyBytes = Encoding.UTF8.GetBytes("Hello limitcode :" + i);
                model.BasicPublish("", queueName, properties, messageBodyBytes);//推送消息
                Console.WriteLine("已发送 {0} 条消息", i);
                System.Threading.Thread.Sleep(2000);
            }
        }
        private void Model_BasicAcks(object sender, RabbitMQ.Client.Events.BasicAckEventArgs e)
        {
                // e.DeliveryTag 是一个整数类型的消息传递标记,在同一个channel中每次push消息后该值都会自增
            Console.WriteLine("消息已成功送达:" + e.DeliveryTag);
        }
        private void Model_BasicNacks(object sender, RabbitMQ.Client.Events.BasicNackEventArgs e)
        {
            Console.WriteLine("消息已送达失败:" + e.DeliveryTag);
        }

通过在这2个事件中,我们可以做一些消息的判断、重试、补偿。

作者:暗夜余晖

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

0

支持

0

反对

posted @2018-8-1  拜读(985)

评论列表

评论内容:



喜欢请打赏

支付宝 微信

请放心支付