项目场景说明
我司OA系统需要将发布的企业新闻信息推送到IM系统,登录IM的每个用户都将在POP窗中收到该条消息。我司IM系统通信基础使用的是云信,所以需要调用云信API接口,将消息推送给云信服务器,再由云信服务器将消息推给每个用户。
在该业务场景中,如果将消息发送给多个用户,使用同步方式会存在很大的性能问题,会导致等待超时。如果使用异步方式不通过消息队列,在出现不可控因素时,容易造成消息丢失。所以技术人员采用RabbitMQ消息队列技术栈解决。
.Net Core 后台托管服务
在.Net Core 中,微软给我们提供了后台托管服务支持,不再像framework 时代一样需要写windows 服务程序了。Core 中托管服务是一个类,具有实现 IHostedService 接口的后台任务逻辑。IHostedService 接口为主机托管的对象定义了两种方法:StartAsync 和 StopAsync,顾名思义前者会在Web主机启动是调用,后者在主机正常关闭时触发。
namespace Microsoft.Extensions.Hosting{ // // Summary: // Defines methods for objects that are managed by the host. public interface IHostedService { // // Summary: // Triggered when the application host is ready to start the service. Task StartAsync(CancellationToken cancellationToken); // // Summary: // Triggered when the application host is performing a graceful shutdown. Task StopAsync(CancellationToken cancellationToken); } }
除了IHostedService接口外,微软还给我们提供了更加强大的抽象基类BackgroundService,你也可以继承该类并在ExecuteAsync抽象方法中写自己的业务逻辑,这样就不必关注其他的工作了。
// Copyright (c) .NET Foundation. Licensed under the Apache License, Version 2.0. /// <summary> /// Base class for implementing a long running <see cref="IHostedService"/>. /// </summary> public abstract class BackgroundService : IHostedService, IDisposable { private Task _executingTask; private readonly CancellationTokenSource _stoppingCts = new CancellationTokenSource(); protected abstract Task ExecuteAsync(CancellationToken stoppingToken); public virtual Task StartAsync(CancellationToken cancellationToken) { // Store the task we're executing _executingTask = ExecuteAsync(_stoppingCts.Token); // If the task is completed then return it, // this will bubble cancellation and failure to the caller if (_executingTask.IsCompleted) { return _executingTask; } // Otherwise it's running return Task.CompletedTask; } public virtual async Task StopAsync(CancellationToken cancellationToken) { // Stop called without start if (_executingTask == null) { return; } try { // Signal cancellation to the executing method _stoppingCts.Cancel(); } finally { // Wait until the task completes or the stop token triggers await Task.WhenAny(_executingTask, Task.Delay(Timeout.Infinite, cancellationToken)); } } public virtual void Dispose() { _stoppingCts.Cancel(); } }
托管服务类写好后可以在 Startup 类中通过
services.AddHostedService<CustomerHostedService>()
注册。
RabbitMQ 在.Net Core 项目中的具体应用
使用nuget包管理器添加Rabbitmq官方SDK.
Install-Package RabbitMQ.Client -Version 6.0.0-pre3
接着在 appsettings.json 配置文件中新增连接rabbitmq服务实例的配置。配置节点分为两部分,第一部分是Rabbit的基础连接信息,第二部分是做为消费者处理业务消息的队列信息。
"MessageQueue": { "RabbitConnect": { "HostName": "127.0.0.1", "Port": 5672, "UserName": "guest", "Password": "guest" }, "BatchSendMessageQueue": { "Exchange": "BatchSendMessage", "Queue": "BatchSendMessage", "ExchangeType": "direct", "RouteKey": "BatchSendMessage" } }
约定好配置文件的格式后,新建 .Net Core 强类型选项类。
/// <summary> /// 约定 强类型配置 /// </summary> public class MessageQueueOption { /// <summary> /// rabbit 连接配置 /// </summary> /// <value> /// The rabbit connect option. /// </value> public RabbitConnectOption RabbitConnect { get; set; } /// <summary> /// 批量推送消息到云信的队列配置信息 /// </summary> /// <value> /// /// </value> public QueueBaseOption BatchSendMessageQueue { get; set; } } /// <summary> /// Rabbit 强类型配置 /// </summary> public class RabbitConnectOption { /// <summary> /// Gets or sets the name of the host. /// </summary> /// <value> /// The name of the host. /// </value> public string HostName { get; set; } /// <summary> /// Gets or sets the port. /// </summary> /// <value> /// The port. /// </value> public int Port { get; set; } /// <summary> /// Gets or sets the name of the user. /// </summary> /// <value> /// The name of the user. /// </value> public string UserName { get; set; } /// <summary> /// Gets or sets the password. /// </summary> /// <value> /// The password. /// </value> public string Password { get; set; } /// <summary> /// Gets or sets the virtual host. /// </summary> /// <value> /// The virtual host. /// </value> public string VirtualHost { get; set; } } /// <summary> /// 队列配置基类 /// </summary> public class QueueBaseOption { /// <summary> /// 交换机名称 /// </summary> /// <value> /// The exchange. /// </value> public string Exchange { get; set; } /// <summary> /// 队列名称 /// </summary> /// <value> /// The queue. /// </value> public string Queue { get; set; } /// <summary> /// 交换机类型 direct、fanout、headers、topic 必须小写 /// </summary> /// <value> /// The type of the exchange. /// </value> public string ExchangeType { get; set; } /// <summary> /// 路由 /// </summary> /// <value> /// The route key. /// </value> public string RouteKey { get; set; } }
考虑到项目后期可能不只一个后台托管服务队列消费者,所以笔者对Rabbit的操作进行了简要抽象,得到了一个 RabbitListenerHostService 抽象类,该类继承 IHostedService 接口。
/// <summary> /// RabbitMQ 监听服务 /// </summary> /// <seealso cref="Microsoft.Extensions.Hosting.IHostedService" /> internal abstract class RabbitListenerHostService : IHostedService { /// <summary> /// /// </summary> protected IConnection _connection; /// <summary> /// The channel /// </summary> protected IModel _channel; /// <summary> /// The rabbit connect options /// </summary> private readonly RabbitConnectOption _rabbitConnectOptions; /// <summary> /// The logger /// </summary> protected readonly ILogger _logger; /// <summary> /// Initializes a new instance of the <see cref="RabbitListenerHostService" /> class. /// </summary> /// <param name="messageQueueOption"></param> /// <param name="logger">The logger.</param> public RabbitListenerHostService(IOptions<MessageQueueOption> messageQueueOption, ILogger logger) { _rabbitConnectOptions = messageQueueOption.Value?.RabbitConnect; _logger = logger; } /// <summary> /// Triggered when the application host is ready to start the service. /// </summary> /// <param name="cancellationToken">Indicates that the start process has been aborted.</param> /// <returns></returns> public Task StartAsync(CancellationToken cancellationToken) { try { if (_rabbitConnectOptions == null) return Task.CompletedTask; var factory = new ConnectionFactory() { HostName = _rabbitConnectOptions.HostName, Port = _rabbitConnectOptions.Port, UserName = _rabbitConnectOptions.UserName, Password = _rabbitConnectOptions.Password, }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); Process(); } catch (Exception ex) { _logger.LogError(ex, "Rabbit连接出现异常"); } return Task.CompletedTask; } /// <summary> /// Triggered when the application host is performing a graceful shutdown. /// </summary> /// <param name="cancellationToken">Indicates that the shutdown process should no longer be graceful.</param> /// <returns></returns> public Task StopAsync(CancellationToken cancellationToken) { if (_connection != null) this._connection.Close(); return Task.CompletedTask; } /// <summary> /// Processes this instance. /// </summary> protected abstract void Process(); }
接着新建一个实现真正处理消息推送业务的 BatchSendMessageSubscribeHostService 类,该类继承 RabbitListenerHostService ,实现其中的抽象方法 Process(),在该方法中写我们的业务代码。
/// <summary> /// /// </summary> /// <seealso cref="Microsoft.Extensions.Hosting.BackgroundService" /> internal class BatchSendMessageSubscribeHostService : RabbitListenerHostService { /// <summary> /// The services /// </summary> private readonly IServiceProvider _services; /// <summary> /// The batch advance option /// </summary> private readonly QueueBaseOption _batchSendMessageQueue; /// <summary> /// Initializes a new instance of the <see cref="AdvanceActionSubscribeHostService" /> class. /// </summary> /// <param name="services">The services.</param> /// <param name="workflowOption">The workflow option.</param> /// <param name="logger">The logger.</param> public AdvanceActionSubscribeHostService(IServiceProvider services, IOptions<MessageQueueOption> messageQueueOption, ILogger<BatchSendMessageSubscribeHostService> logger):base(MessageQueueOption, logger) { _services = services; _batchSendMessageQueue= messageQueueOption.Value?.BatchSendMessageQueue; } /// <summary> /// Processes this instance. /// </summary> protected override void Process() { _logger.LogInformation("调用ExecuteAsync"); using (var scope = _services.CreateScope()) { var durable = true;//约定使用持久化 var noack = false;//消息手动确认,否则消费者在接收到消息后会自动应答 try { //我们在消费端 从新进行一次 队列和交换机的绑定 ,防止 因为消费端在生产端 之前运行的 问题。 _channel.ExchangeDeclare(_batchSendMessageQueue.Exchange, _batchSendMessageQueue.ExchangeType, durable); _channel.QueueDeclare(_batchSendMessageQueue.Queue, durable, false, false, null); _channel.QueueBind(_batchSendMessageQueue.Queue, _batchSendMessageQueue.Exchange, _batchSendMessageQueue.RouteKey, null); #region 通过事件的形式,如果队列中有消息,则执行事件。建议采用这种方式。 _logger.LogInformation("开始监听队列:" + _batchSendMessageQueue.Queue); _channel.BasicQos(0, 1, false);//设置一个消费者在同一时间只处理一个消息,这个rabbitmq 就会将消息公平分发 var consumer = new EventingBasicConsumer(_channel); consumer.Received += (ch, ea) => { try { var content = Encoding.UTF8.GetString(ea.Body); _logger.LogInformation("获取到消息:" + content); // TODO: 消息推送云信的业务代码 } catch (Exception ex) { _logger.LogError(ex, ""); } finally { _channel.BasicAck(ea.DeliveryTag, false); } }; _channel.BasicConsume(_batchSendMessageQueue.Queue, noack, consumer); #endregion } catch (Exception ex) { _logger.LogError(ex, ""); } } } }
业务代码编写完成后,新建 IServiceCollection 的扩展方法,方便在Startup 中注入服务。
/// <summary> /// 依赖注入扩展类 /// </summary> public static class ServiceCollectionExtension { /// <summary> /// Adds the register queue sub. /// </summary> /// <param name="services">The services.</param> /// <param name="option">The option.</param> /// <returns></returns> public static IServiceCollection AddMessageQueueOption(this IServiceCollection services, Action<MessageQueueOption> option) { services.Configure(option); return services; } /// <summary> /// Adds the register queue sub. /// </summary> /// <param name="services">The services.</param> /// <param name="configuration">The configuration.</param> /// <returns></returns> public static IServiceCollection AddMessageQueueOption(this IServiceCollection services, IConfiguration configuration) { services.Configure<MessageQueueOption>(configuration); return services; } /// <summary> /// 服务自注册,实现自管理 /// </summary> /// <param name="services">The services.</param> /// <returns></returns> public static IServiceCollection AddBatchSendMessageService(this IServiceCollection services) { services.AddHostedService<BatchSendMessageSubscribeHostService>(); return services; } }
评论列表
评论内容: