夫天地者,万物之逆旅;光阴者,百代之过客。而浮生若梦,为欢几何?
.Net Core 项目实战:RabbitMQ的正确使用姿势

项目场景说明

我司OA系统需要将发布的企业新闻信息推送到IM系统,登录IM的每个用户都将在POP窗中收到该条消息。我司IM系统通信基础使用的是云信,所以需要调用云信API接口,将消息推送给云信服务器,再由云信服务器将消息推给每个用户。

在该业务场景中,如果将消息发送给多个用户,使用同步方式会存在很大的性能问题,会导致等待超时。如果使用异步方式不通过消息队列,在出现不可控因素时,容易造成消息丢失。所以技术人员采用RabbitMQ消息队列技术栈解决。

.Net Core 后台托管服务

在.Net Core 中,微软给我们提供了后台托管服务支持,不再像framework 时代一样需要写windows 服务程序了。Core 中托管服务是一个类,具有实现 IHostedService 接口的后台任务逻辑。IHostedService 接口为主机托管的对象定义了两种方法:StartAsync 和 StopAsync,顾名思义前者会在Web主机启动是调用,后者在主机正常关闭时触发。

namespace Microsoft.Extensions.Hosting{    //
    // Summary:
    //     Defines methods for objects that are managed by the host.
    public interface IHostedService
    {        //
        // Summary:
        // Triggered when the application host is ready to start the service.
        Task StartAsync(CancellationToken cancellationToken);        //
        // Summary:
        // Triggered when the application host is performing a graceful shutdown.
        Task StopAsync(CancellationToken cancellationToken);
    }
}

除了IHostedService接口外,微软还给我们提供了更加强大的抽象基类BackgroundService,你也可以继承该类并在ExecuteAsync抽象方法中写自己的业务逻辑,这样就不必关注其他的工作了。

// Copyright (c) .NET Foundation. Licensed under the Apache License, Version 2.0.
/// <summary>
/// Base class for implementing a long running <see cref="IHostedService"/>.
/// </summary>
public abstract class BackgroundService : IHostedService, IDisposable
{
    private Task _executingTask;
    private readonly CancellationTokenSource _stoppingCts =
                                                   new CancellationTokenSource();

    protected abstract Task ExecuteAsync(CancellationToken stoppingToken);

    public virtual Task StartAsync(CancellationToken cancellationToken)
    {
        // Store the task we're executing
        _executingTask = ExecuteAsync(_stoppingCts.Token);

        // If the task is completed then return it,
        // this will bubble cancellation and failure to the caller
        if (_executingTask.IsCompleted)
        {
            return _executingTask;
        }

        // Otherwise it's running
        return Task.CompletedTask;
    }

    public virtual async Task StopAsync(CancellationToken cancellationToken)
    {
        // Stop called without start
        if (_executingTask == null)
        {
            return;
        }

        try
        {
            // Signal cancellation to the executing method
            _stoppingCts.Cancel();
        }
        finally
        {
            // Wait until the task completes or the stop token triggers
            await Task.WhenAny(_executingTask, Task.Delay(Timeout.Infinite,
                                                          cancellationToken));
        }

    }

    public virtual void Dispose()
    {
        _stoppingCts.Cancel();
    }
}

托管服务类写好后可以在 Startup 类中通过 

services.AddHostedService<CustomerHostedService>()

注册。

RabbitMQ 在.Net Core 项目中的具体应用

使用nuget包管理器添加Rabbitmq官方SDK.

Install-Package RabbitMQ.Client -Version 6.0.0-pre3

接着在 appsettings.json 配置文件中新增连接rabbitmq服务实例的配置。配置节点分为两部分,第一部分是Rabbit的基础连接信息,第二部分是做为消费者处理业务消息的队列信息。

"MessageQueue": {
    "RabbitConnect": {
      "HostName": "127.0.0.1",
      "Port": 5672,
      "UserName": "guest",
      "Password": "guest"
    },
    "BatchSendMessageQueue": {
      "Exchange": "BatchSendMessage",
      "Queue": "BatchSendMessage",
      "ExchangeType": "direct",
      "RouteKey": "BatchSendMessage"
    }
}

约定好配置文件的格式后,新建 .Net Core 强类型选项类。

/// <summary>
    /// 约定 强类型配置
    /// </summary>
    public class MessageQueueOption
    {
        /// <summary>
        /// rabbit 连接配置
        /// </summary>
        /// <value>
        /// The rabbit connect option.
        /// </value>
        public RabbitConnectOption RabbitConnect { get; set; }
        /// <summary>
        /// 批量推送消息到云信的队列配置信息
        /// </summary>
        /// <value>
        /// 
        /// </value>
        public QueueBaseOption BatchSendMessageQueue { get; set; }
        
    }
    /// <summary>
    /// Rabbit 强类型配置
    /// </summary>
    public class RabbitConnectOption
    {
        /// <summary>
        /// Gets or sets the name of the host.
        /// </summary>
        /// <value>
        /// The name of the host.
        /// </value>
        public string HostName { get; set; }
        /// <summary>
        /// Gets or sets the port.
        /// </summary>
        /// <value>
        /// The port.
        /// </value>
        public int Port { get; set; }
        /// <summary>
        /// Gets or sets the name of the user.
        /// </summary>
        /// <value>
        /// The name of the user.
        /// </value>
        public string UserName { get; set; }
        /// <summary>
        /// Gets or sets the password.
        /// </summary>
        /// <value>
        /// The password.
        /// </value>
        public string Password { get; set; }
        /// <summary>
        /// Gets or sets the virtual host.
        /// </summary>
        /// <value>
        /// The virtual host.
        /// </value>
        public string VirtualHost { get; set; }
    }
    /// <summary>
    /// 队列配置基类
    /// </summary>
    public class QueueBaseOption
    {
        /// <summary>
        /// 交换机名称
        /// </summary>
        /// <value>
        /// The exchange.
        /// </value>
        public string Exchange { get; set; }
        /// <summary>
        /// 队列名称
        /// </summary>
        /// <value>
        /// The queue.
        /// </value>
        public string Queue { get; set; }
        /// <summary>
        /// 交换机类型 direct、fanout、headers、topic 必须小写
        /// </summary>
        /// <value>
        /// The type of the exchange.
        /// </value>
        public string ExchangeType { get; set; }
        /// <summary>
        /// 路由
        /// </summary>
        /// <value>
        /// The route key.
        /// </value>
        public string RouteKey { get; set; }
    }

考虑到项目后期可能不只一个后台托管服务队列消费者,所以笔者对Rabbit的操作进行了简要抽象,得到了一个 RabbitListenerHostService 抽象类,该类继承 IHostedService 接口。

/// <summary>
    /// RabbitMQ 监听服务
    /// </summary>
    /// <seealso cref="Microsoft.Extensions.Hosting.IHostedService" />
    internal abstract class RabbitListenerHostService : IHostedService
    {
        /// <summary>
        /// 
        /// </summary>
        protected IConnection _connection;
        /// <summary>
        /// The channel
        /// </summary>
        protected IModel _channel;
        /// <summary>
        /// The rabbit connect options
        /// </summary>
        private readonly RabbitConnectOption _rabbitConnectOptions;
        /// <summary>
        /// The logger
        /// </summary>
        protected readonly ILogger _logger;
        /// <summary>
        /// Initializes a new instance of the <see cref="RabbitListenerHostService" /> class.
        /// </summary>
        /// <param name="messageQueueOption"></param>
        /// <param name="logger">The logger.</param>
        public RabbitListenerHostService(IOptions<MessageQueueOption> messageQueueOption, ILogger logger)
        {
            _rabbitConnectOptions = messageQueueOption.Value?.RabbitConnect;
            _logger = logger;
        }
        /// <summary>
        /// Triggered when the application host is ready to start the service.
        /// </summary>
        /// <param name="cancellationToken">Indicates that the start process has been aborted.</param>
        /// <returns></returns>
        public Task StartAsync(CancellationToken cancellationToken)
        {
            try
            {
                if (_rabbitConnectOptions == null) return Task.CompletedTask;
                var factory = new ConnectionFactory()
                {
                    HostName = _rabbitConnectOptions.HostName,
                    Port = _rabbitConnectOptions.Port,
                    UserName = _rabbitConnectOptions.UserName,
                    Password = _rabbitConnectOptions.Password,
                };
                _connection = factory.CreateConnection();
                _channel = _connection.CreateModel();
                Process();
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Rabbit连接出现异常");
            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// Triggered when the application host is performing a graceful shutdown.
        /// </summary>
        /// <param name="cancellationToken">Indicates that the shutdown process should no longer be graceful.</param>
        /// <returns></returns>
        public Task StopAsync(CancellationToken cancellationToken)
        {
            if (_connection != null)
                this._connection.Close();
            return Task.CompletedTask;
        }

        /// <summary>
        /// Processes this instance.
        /// </summary>
        protected abstract void Process();
    }

接着新建一个实现真正处理消息推送业务的 BatchSendMessageSubscribeHostService 类,该类继承 RabbitListenerHostService ,实现其中的抽象方法 Process(),在该方法中写我们的业务代码。

/// <summary>
    /// 
    /// </summary>
    /// <seealso cref="Microsoft.Extensions.Hosting.BackgroundService" />
    internal class BatchSendMessageSubscribeHostService : RabbitListenerHostService
    {
        /// <summary>
        /// The services
        /// </summary>
        private readonly IServiceProvider _services;
        /// <summary>
        /// The batch advance option
        /// </summary>
        private readonly QueueBaseOption _batchSendMessageQueue;
        /// <summary>
        /// Initializes a new instance of the <see cref="AdvanceActionSubscribeHostService" /> class.
        /// </summary>
        /// <param name="services">The services.</param>
        /// <param name="workflowOption">The workflow option.</param>
        /// <param name="logger">The logger.</param>
        public AdvanceActionSubscribeHostService(IServiceProvider services, IOptions<MessageQueueOption> messageQueueOption,
            ILogger<BatchSendMessageSubscribeHostService> logger):base(MessageQueueOption, logger)
        {
            _services = services;
            _batchSendMessageQueue= messageQueueOption.Value?.BatchSendMessageQueue;
        }
        /// <summary>
        /// Processes this instance.
        /// </summary>
        protected override void Process()
        {
            _logger.LogInformation("调用ExecuteAsync");
            using (var scope = _services.CreateScope())
            {
                 
                var durable = true;//约定使用持久化
                var noack = false;//消息手动确认,否则消费者在接收到消息后会自动应答
                try
                {
                    //我们在消费端 从新进行一次 队列和交换机的绑定 ,防止 因为消费端在生产端 之前运行的 问题。
                    _channel.ExchangeDeclare(_batchSendMessageQueue.Exchange, _batchSendMessageQueue.ExchangeType, durable);
                    _channel.QueueDeclare(_batchSendMessageQueue.Queue, durable, false, false, null);
                    _channel.QueueBind(_batchSendMessageQueue.Queue, _batchSendMessageQueue.Exchange, _batchSendMessageQueue.RouteKey, null);
                    #region 通过事件的形式,如果队列中有消息,则执行事件。建议采用这种方式。
                    _logger.LogInformation("开始监听队列:" + _batchSendMessageQueue.Queue);
                    _channel.BasicQos(0, 1, false);//设置一个消费者在同一时间只处理一个消息,这个rabbitmq 就会将消息公平分发
                    var consumer = new EventingBasicConsumer(_channel);
                    consumer.Received += (ch, ea) =>
                   {
                       try
                       {
                           var content = Encoding.UTF8.GetString(ea.Body);
                           _logger.LogInformation("获取到消息:" + content);
                           // TODO: 消息推送云信的业务代码
                       }
                       catch (Exception ex)
                       {
                           _logger.LogError(ex, "");
                       }
                       finally
                       {
                           _channel.BasicAck(ea.DeliveryTag, false);
                       }
                   };
                    _channel.BasicConsume(_batchSendMessageQueue.Queue, noack, consumer);
                    #endregion
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "");
                }
            }
        }
    }

业务代码编写完成后,新建 IServiceCollection 的扩展方法,方便在Startup 中注入服务。

/// <summary>
    /// 依赖注入扩展类
    /// </summary>
    public static class ServiceCollectionExtension
    {
        /// <summary>
        /// Adds the register queue sub.
        /// </summary>
        /// <param name="services">The services.</param>
        /// <param name="option">The option.</param>
        /// <returns></returns>
        public static IServiceCollection AddMessageQueueOption(this IServiceCollection services, Action<MessageQueueOption> option)
        {
            services.Configure(option);
            return services;
        }
        /// <summary>
        /// Adds the register queue sub.
        /// </summary>
        /// <param name="services">The services.</param>
        /// <param name="configuration">The configuration.</param>
        /// <returns></returns>
        public static IServiceCollection AddMessageQueueOption(this IServiceCollection services, IConfiguration configuration)
        {
            services.Configure<MessageQueueOption>(configuration);
            return services;
        }
        /// <summary>
        /// 服务自注册,实现自管理
        /// </summary>
        /// <param name="services">The services.</param>
        /// <returns></returns>
        public static IServiceCollection AddBatchSendMessageService(this IServiceCollection services)
        {
            services.AddHostedService<BatchSendMessageSubscribeHostService>();
            return services;
        }
    }


作者:暗夜余晖

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

1

支持

0

反对

posted @2019-5-15  拜读(590)

评论列表

评论内容:



喜欢请打赏

支付宝 微信

请放心支付