using RabbitMQ.Client; using RabbitMQ.Client.Events; using Serenity; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace HT.Cloud.Code { /// /// RabbitMq工具类 /// public class RabbitMqHelper : IDisposable { #region 私有静态字段 /// /// RabbitMQ建议客户端线程之间不要共用Model,至少要保证共用Model的线程发送消息必须是串行的,但是建议尽量共用Connection。 /// private static readonly ConcurrentDictionary _channelDic = new(); /// /// QueueHelper缓存 /// private static readonly ConcurrentDictionary> _queueHelperDic = new(); /// /// 用于缓存路由键数据 /// private static readonly ConcurrentDictionary _routeDic = new(); /// /// RabbitMq连接 /// private static IConnection _connection; /// /// 线程对象,线程锁使用 /// private static readonly object _locker = new(); #endregion 私有静态字段 #region 公有属性 /// /// 默认内置死信交换机,默认值:deadletter.default.router /// public string DefaultDeadLetterExchange { get; set; } = "deadletter.default.router"; #endregion 公有属性 #region 构造函数 /// /// 构造函数 /// /// RabbitMq配置 public RabbitMqHelper(MqConfig config) { if (config == null) throw new ArgumentNullException(nameof(config)); if (_connection == null) { lock (_locker) { if (_connection == null) _connection = new ConnectionFactory { //设置主机名 HostName = config.HostName, //虚拟主机 VirtualHost = config.VirtualHost, //设置心跳时间 RequestedHeartbeat = config.RequestedHeartbeat, //设置自动重连 AutomaticRecoveryEnabled = config.AutomaticRecoveryEnabled, //重连时间 NetworkRecoveryInterval = config.NetworkRecoveryInterval, //用户名 UserName = config.UserName, //密码 Password = config.Password, //配置端口 Port=config.Port }.CreateConnection(); } } } /// /// 构造函数 /// /// RabbitMq连接工厂 public RabbitMqHelper(ConnectionFactory factory) { if (factory == null) throw new ArgumentNullException(nameof(factory)); if (_connection == null) { lock (_locker) { if (_connection == null) _connection = factory.CreateConnection(); } } } #endregion 构造函数 #region 管道 /// /// 获取管道 /// /// 队列名称 /// public IModel GetChannel(string queue = "default") { IModel channel = null; if (!string.IsNullOrEmpty(queue) && _channelDic.ContainsKey(queue)) channel = _channelDic[queue]; if (!string.IsNullOrEmpty(queue) && channel == null) { lock (_locker) { channel = _channelDic.GetOrAdd(queue, queue => { var model = _connection.CreateModel(); _channelDic[queue] = model; return model; }); } } channel ??= _connection.CreateModel(); return EnsureOpened(channel); } /// /// 确保管道是已打开状态 /// /// 管道 /// public IModel EnsureOpened(IModel channel) { channel ??= GetChannel(); if (channel.IsClosed) { var data = _channelDic.Where(x => x.Value == channel)?.FirstOrDefault(); if (data != null && data.Value.Key != null) { //移除已关闭的管道 _channelDic.TryRemove(data.Value.Key, out var model); //重新获取管道 channel = GetChannel(data.Value.Key); } else { channel = GetChannel(); } } return channel; } /// /// 如果交换机或者队列不存在则移除缓存中异常关闭的管道 /// /// public void RemoveIfNotExist(IModel channel) { var data = _channelDic.Where(x => x.Value == channel)?.FirstOrDefault(); if (data != null && data.Value.Key != null) { //移除已关闭的管道 _channelDic.TryRemove(data.Value.Key, out var model); channel = null; } } /// /// 获取QueueHelper /// /// /// public QueueHelper GetQueueHelper(string queue) { if (!string.IsNullOrEmpty(queue)) queue = "default"; if (_queueHelperDic.ContainsKey(queue)) return _queueHelperDic[queue]; lock (_locker) { return _queueHelperDic.GetOrAdd(queue, queue => { var queueHelper = new QueueHelper(x => PublishRabbitMqMessage( x.Exchange, x.Queue, x.RoutingKey, x.Body, x.ExchangeType, x.Durable, x.Confirm, x.Expiration, x.Priority, x.QueueArguments, x.ExchangeArguments, x.Headers)); _queueHelperDic[queue] = queueHelper; return queueHelper; }); } } #endregion 管道 #region 交换机 /// /// 声明交换机,交换机可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。 /// /// 管道 /// 交换机名称 /// 交换机类型: /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都 /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout /// 交换机转发消息是最快的。 /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多 /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” /// 只会匹配到“audit.irs”。 /// 持久化 /// 自动删除 /// 参数 /// 管道 public IModel ExchangeDeclare( IModel channel, string exchange, string exchangeType = ExchangeType.Direct, bool durable = true, bool autoDelete = false, IDictionary arguments = null) { channel = EnsureOpened(channel); channel.ExchangeDeclare(exchange, exchangeType, durable, autoDelete, arguments); return channel; } /// /// 声明交换机,交换机可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。 /// /// 管道 /// 交换机名称 /// 交换机类型: /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都 /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout /// 交换机转发消息是最快的。 /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多 /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” /// 只会匹配到“audit.irs”。 /// 持久化 /// 自动删除 /// 参数 /// 管道 public IModel ExchangeDeclareNoWait( IModel channel, string exchange, string exchangeType = ExchangeType.Direct, bool durable = true, bool autoDelete = false, IDictionary arguments = null) { channel = EnsureOpened(channel); channel.ExchangeDeclareNoWait(exchange, exchangeType, durable, autoDelete, arguments); return channel; } /// /// 被动声明交换机,用于判断交换机是否存在,若存在则无异常,若不存在则抛出异常 /// /// 管道 /// 交换机名称 public void ExchangeDeclarePassive(IModel channel, string exchange) { EnsureOpened(channel).ExchangeDeclarePassive(exchange); } /// /// 判断交换机是否存在 /// /// 管道 /// 交换机名称 /// public bool IsExchangeExist(IModel channel, string exchange) { try { ExchangeDeclarePassive(channel, exchange); return true; } catch { RemoveIfNotExist(channel); return false; } } /// /// 删除交换机 /// /// 管道 /// 交换机名称 /// 是否没有被使用 public void ExchangeDelete( IModel channel, string exchange, bool ifUnused = false) { RouteKeyRemoveByExchange(exchange); EnsureOpened(channel).ExchangeDelete(exchange, ifUnused); } /// /// 删除交换机 /// /// 管道 /// 交换机名称 /// 是否没有被使用 public void ExchangeDeleteNoWait( IModel channel, string exchange, bool ifUnused = false) { RouteKeyRemoveByExchange(exchange); EnsureOpened(channel).ExchangeDeleteNoWait(exchange, ifUnused); } /// /// 绑定交换机 /// /// 管道 /// 目标交换机 /// 源交换机 /// 路由键 /// 参数 public void ExchangeBind( IModel channel, string destinationExchange, string sourceExchange, string routingKey, IDictionary arguments = null) { EnsureOpened(channel).ExchangeBind(destinationExchange, sourceExchange, routingKey, arguments); } /// /// 绑定交换机 /// /// 管道 /// 目标交换机 /// 源交换机 /// 路由键 /// 参数 public void ExchangeBindNoWait( IModel channel, string destinationExchange, string sourceExchange, string routingKey, IDictionary arguments = null) { EnsureOpened(channel).ExchangeBindNoWait(destinationExchange, sourceExchange, routingKey, arguments); } /// /// 解绑交换机 /// /// 管道 /// 目标交换机 /// 源交换机 /// 路由键 /// 参数 public void ExchangeUnbind( IModel channel, string destinationExchange, string sourceExchange, string routingKey, IDictionary arguments = null) { EnsureOpened(channel).ExchangeUnbind(destinationExchange, sourceExchange, routingKey, arguments); } /// /// 解绑交换机 /// /// 管道 /// 目标交换机 /// 源交换机 /// 路由键 /// 参数 public void ExchangeUnbindNoWait( IModel channel, string destinationExchange, string sourceExchange, string routingKey, IDictionary arguments = null) { EnsureOpened(channel).ExchangeUnbindNoWait(destinationExchange, sourceExchange, routingKey, arguments); } #endregion 交换机 #region 队列 /// /// 声明队列,队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。 /// /// 管道 /// 队列名称 /// 持久化 /// 排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见, /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可 /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连 /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者 /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。 /// 自动删除 /// 参数 /// 管道 public IModel QueueDeclare( IModel channel, string queue, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary arguments = null) { channel = EnsureOpened(channel); channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); return channel; } /// /// 声明队列,队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。 /// /// 管道 /// 队列名称 /// 持久化 /// 排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见, /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可 /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连 /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者 /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。 /// 自动删除 /// 参数 /// 管道 public IModel QueueDeclareNoWait( IModel channel, string queue, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary arguments = null) { channel = EnsureOpened(channel); channel.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments); return channel; } /// /// 被动声明队列,用于判断队列是否存在,若队列存在则无异常,若不存在则抛异常 /// /// 管道 /// 队列名称 public void QueueDeclarePassive(IModel channel, string queue) { EnsureOpened(channel).QueueDeclarePassive(queue); } /// /// 判断队列是否存在 /// /// 管道 /// 队列名称 /// public bool IsQueueExist(IModel channel, string queue) { try { QueueDeclarePassive(channel, queue); return true; } catch { RemoveIfNotExist(channel); return false; } } /// /// 删除队列 /// /// 管道 /// 队列名称 /// 是否没有被使用 /// 是否为空 /// public uint QueueDelete( IModel channel, string queue, bool ifUnused = false, bool ifEmpty = false) { RouteKeyRemoveByQueue(queue); return EnsureOpened(channel).QueueDelete(queue, ifUnused, ifEmpty); } /// /// 删除队列 /// /// 管道 /// 队列名称 /// 是否没有被使用 /// 是否为空 public void QueueDeleteNoWait( IModel channel, string queue, bool ifUnused = false, bool ifEmpty = false) { RouteKeyRemoveByQueue(queue); EnsureOpened(channel).QueueDeleteNoWait(queue, ifUnused, ifEmpty); } /// /// 绑定队列 /// /// 管道 /// 交换机名称 /// 队列名称 /// 路由键 /// 参数 public void QueueBind( IModel channel, string exchange, string queue, string routingKey, IDictionary arguments = null) { if (IsRouteKeyExist(exchange, queue, routingKey)) return; EnsureOpened(channel).QueueBind(queue, exchange, routingKey, arguments); } /// /// 绑定队列 /// /// 管道 /// 交换机名称 /// 队列名称 /// 路由键 /// 参数 public void QueueBindNoWait( IModel channel, string exchange, string queue, string routingKey, IDictionary arguments = null) { if (IsRouteKeyExist(exchange, queue, routingKey)) return; EnsureOpened(channel).QueueBindNoWait(queue, exchange, routingKey, arguments); } /// /// 解绑队列 /// /// 管道 /// 交换机名称 /// 队列名称 /// 路由键 /// 参数 public void QueueUnbind( IModel channel, string exchange, string queue, string routingKey, IDictionary arguments = null) { RouteKeyRemove(exchange, queue, routingKey); EnsureOpened(channel).QueueUnbind(queue, exchange, routingKey, arguments); } /// /// 清除队列 /// /// 管道 /// 队列名称 public void QueuePurge(IModel channel, string queue) { EnsureOpened(channel).QueuePurge(queue); } #endregion 队列 #region 路由 /// /// 判断路由键是否存在,仅在内存中作判断 /// /// 交换机 /// 队列 /// 路由键 /// public bool IsRouteKeyExist(string exchange, string queue, string routeKey) { var key = $"{exchange}.{queue}.{routeKey}"; if (_routeDic.ContainsKey(key)) return true; else { lock (_locker) { if (!_routeDic.ContainsKey(key)) { _routeDic.TryAdd(key, (exchange, queue, routeKey)); return false; } return true; } } } /// /// 移除路由键缓存 /// /// /// /// public void RouteKeyRemove(string exchange, string queue, string routeKey) { var key = $"{exchange}.{queue}.{routeKey}"; if (_routeDic.ContainsKey(key)) { lock (_locker) { if (_routeDic.ContainsKey(key)) _routeDic.TryRemove(key, out var _); } } } /// /// 移除路由键缓存 /// /// 队列 public void RouteKeyRemoveByQueue(string queue) { if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.queue == queue)) { lock (_locker) { if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.queue == queue)) { var keys = _routeDic.Keys.Where(x => x.Split('.')[1] == queue); foreach (var key in keys) { _routeDic.TryRemove(key, out var _); } } } } } /// /// 移除路由键缓存 /// /// 交换机 public void RouteKeyRemoveByExchange(string exchange) { if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.exchange == exchange)) { lock (_locker) { if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.exchange == exchange)) { var keys = _routeDic.Keys.Where(x => x.Split('.')[0] == exchange); foreach (var key in keys) { _routeDic.TryRemove(key, out var _); } } } } } #endregion 路由 #region 发布消息 /// /// 发布消息 /// /// 消息指令 /// 消息发送确认 /// 单个消息过期时间,单位ms /// 单个消息优先级,数值越大优先级越高,取值范围:0-9 /// public bool Publish( T command, bool confirm = false, string expiration = null, byte? priority = null) where T : class { var attribute = typeof(T).GetAttribute(); if (attribute == null) throw new ArgumentException("RabbitMqAttribute Is Null!"); return Publish(attribute, command, confirm, expiration, priority); } /// /// 发布消息 /// /// RabbitMq特性配置 /// 消息指令 /// 消息发送确认 /// 单个消息过期时间,单位ms /// 单个消息优先级,数值越大优先级越高,取值范围:0-9 /// public bool Publish( RabbitMqAttribute attribute, T command, bool confirm = false, string expiration = null, byte? priority = null) where T : class { //消息内容 var body = command.ToJson(); //自定义队列参数 var arguments = new Dictionary(); //设置队列消息过期时间,指整个队列的所有消息 if (attribute.MessageTTL > 0) arguments["x-message-ttl"] = attribute.MessageTTL; //设置队列过期时间 if (attribute.AutoExpire > 0) arguments["x-expires"] = attribute.AutoExpire; //设置队列最大长度 if (attribute.MaxLength > 0) arguments["x-max-length"] = attribute.MaxLength; //设置队列占用最大空间 if (attribute.MaxLengthBytes > 0) arguments["x-max-length-bytes"] = attribute.MaxLengthBytes; //设置队列溢出行为 if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish") arguments["x-overflow"] = attribute.OverflowBehaviour; //设置死信交换机 if (!attribute.DeadLetterExchange.IsNullOrEmpty()) arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange; //设置死信路由键 if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty()) arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey; //设置队列优先级 if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10) arguments["x-max-priority"] = attribute.MaximumPriority; //设置队列惰性模式 if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy") arguments["x-queue-mode"] = attribute.LazyMode; //设置集群配置 if (!attribute.MasterLocator.IsNullOrEmpty()) arguments["x-queue-master-locator"] = attribute.MasterLocator; //发送消息 return Publish( attribute.Exchange, attribute.Queue, attribute.RoutingKey, body, attribute.ExchangeType, attribute.Durable, confirm, expiration, priority, arguments, null, attribute.Header?.ToObject>()); } /// /// 发布消息 /// /// 消息指令 /// 消息发送确认 /// 单个消息过期时间,单位ms /// 单个消息优先级,数值越大优先级越高,取值范围:0-9 /// public bool Publish( IEnumerable command, bool confirm = false, string expiration = null, byte? priority = null) where T : class { var attribute = typeof(T).GetAttribute(); if (attribute == null) throw new ArgumentException("RabbitMqAttribute Is Null!"); return Publish(attribute, command, confirm, expiration, priority); } /// /// 发布消息 /// /// RabbitMq特性配置 /// 消息指令 /// 消息发送确认 /// 单个消息过期时间,单位ms /// 单个消息优先级,数值越大优先级越高,取值范围:0-9 /// public bool Publish( RabbitMqAttribute attribute, IEnumerable command, bool confirm = false, string expiration = null, byte? priority = null) where T : class { //消息内容 var body = command.Select(x => x.ToJson()); //自定义队列参数 var arguments = new Dictionary(); //设置队列消息过期时间,指整个队列的所有消息 if (attribute.MessageTTL > 0) arguments["x-message-ttl"] = attribute.MessageTTL; //设置队列过期时间 if (attribute.AutoExpire > 0) arguments["x-expires"] = attribute.AutoExpire; //设置队列最大长度 if (attribute.MaxLength > 0) arguments["x-max-length"] = attribute.MaxLength; //设置队列占用最大空间 if (attribute.MaxLengthBytes > 0) arguments["x-max-length-bytes"] = attribute.MaxLengthBytes; //设置队列溢出行为 if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish") arguments["x-overflow"] = attribute.OverflowBehaviour; //设置死信交换机 if (!attribute.DeadLetterExchange.IsNullOrEmpty()) arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange; //设置死信路由键 if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty()) arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey; //设置队列优先级 if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10) arguments["x-max-priority"] = attribute.MaximumPriority; //设置队列惰性模式 if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy") arguments["x-queue-mode"] = attribute.LazyMode; //设置集群配置 if (!attribute.MasterLocator.IsNullOrEmpty()) arguments["x-queue-master-locator"] = attribute.MasterLocator; //发送消息 return Publish( attribute.Exchange, attribute.Queue, attribute.RoutingKey, body, attribute.ExchangeType, attribute.Durable, confirm, expiration, priority, arguments, null, attribute.Header?.ToObject>()); } /// /// 发布消息 /// /// 交换机名称 /// 队列名称 /// 路由键 /// 消息内容 /// 交换机类型 /// 持久化 /// 消息发送确认 /// 单个消息过期时间,单位ms /// 单个消息优先级,数值越大优先级越高,取值范围:0-9 /// 队列参数 /// 交换机参数 /// 消息头部 /// public bool Publish( string exchange, string queue, string routingKey, string body, string exchangeType = ExchangeType.Direct, bool durable = true, bool confirm = false, string expiration = null, byte? priority = null, IDictionary queueArguments = null, IDictionary exchangeArguments = null, IDictionary headers = null) { return Publish( exchange, queue, routingKey, new[] { body }, exchangeType, durable, confirm, expiration, priority, queueArguments, exchangeArguments, headers); } /// /// 发布消息 /// /// 交换机名称 /// 队列名称 /// 路由键 /// 消息内容 /// 交换机类型 /// 持久化 /// 消息发送确认 /// 单个消息过期时间,单位ms /// 单个消息优先级,数值越大优先级越高,取值范围:0-9 /// 队列参数 /// 交换机参数 /// 消息头部 /// public bool Publish( string exchange, string queue, string routingKey, IEnumerable body, string exchangeType = ExchangeType.Direct, bool durable = true, bool confirm = false, string expiration = null, byte? priority = null, IDictionary queueArguments = null, IDictionary exchangeArguments = null, IDictionary headers = null) { return GetQueueHelper(queue).Enqueue( new QueueMessage { Exchange = exchange, Queue = queue, RoutingKey = routingKey, Body = body, ExchangeType = exchangeType, Durable = durable, Confirm = confirm, Expiration = expiration, Priority = priority, QueueArguments = queueArguments, ExchangeArguments = exchangeArguments, Headers = headers }); } /// /// 发布消息到RabbitMq,注意此方法使用IModel线程不安全,建议使用Publish方法,若要使用需要保证单线程调用 /// /// 交换机名称 /// 队列名称 /// 路由键 /// 消息内容 /// 交换机类型 /// 持久化 /// 消息发送确认 /// 单个消息过期时间,单位ms /// 单个消息优先级,数值越大优先级越高,取值范围:0-9 /// 队列参数 /// 交换机参数 /// 消息头部 /// public bool PublishRabbitMqMessage( string exchange, string queue, string routingKey, IEnumerable body, string exchangeType = ExchangeType.Direct, bool durable = true, bool confirm = false, string expiration = null, byte? priority = null, IDictionary queueArguments = null, IDictionary exchangeArguments = null, IDictionary headers = null) { //获取管道 var channel = GetChannel(queue); //判断交换机是否存在 if (!IsExchangeExist(channel, exchange)) channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments); //判断队列是否存在 if (!IsQueueExist(channel, queue)) channel = QueueDeclare(channel, queue, durable, arguments: queueArguments); //绑定交换机和队列 QueueBind(channel, exchange, queue, routingKey); //声明消息属性 var props = channel.CreateBasicProperties(); //持久化 props.Persistent = durable; //单个消息过期时间 if (!expiration.IsNullOrEmpty()) props.Expiration = expiration; //单个消息优先级 if (priority >= 0 && priority <= 9) props.Priority = priority.Value; //消息头部 if (headers != null) props.Headers = headers; //是否启用消息发送确认机制 if (confirm) channel.ConfirmSelect(); //发送消息 foreach (var item in body) { var text = string.IsNullOrEmpty(item) ? null : Encoding.UTF8.GetBytes(item); channel.BasicPublish(exchange, routingKey, props, text); } //消息发送失败处理 if (confirm && !channel.WaitForConfirms()) return false; return true; } /// /// 发布消息到内置死信队列 /// /// 队列名称 /// 交换机名称 /// 路由键 /// 消息内容 /// 重试次数 /// 异常 /// private bool PublishToDead( string queue, string exchange, string routingKey, string body, int retryCount, Exception exception) { //内置死信队列、交换机、路由键 var deadLetterQueue = $"{queue.ToLower()}.{(exception != null ? "error" : "fail")}"; var deadLetterExchange = DefaultDeadLetterExchange; var deadLetterRoutingKey = $"{queue.ToLower()}.deadletter"; //内置死信队列内容 var deadLetterBody = new DeadLetterQueue { Body = body, CreateDateTime = DateTime.Now, Exception = exception, ExceptionMsg = exception?.Message, Queue = queue, RoutingKey = routingKey, Exchange = exchange, RetryCount = retryCount }; //发送内置死信消息,注意此处与原生的死信消息无关 return Publish(deadLetterExchange, deadLetterQueue, deadLetterRoutingKey, deadLetterBody.ToJson()); } #endregion 发布消息 #region 订阅消息 /// /// 订阅消息 /// /// /// 消费处理委托 /// 异常处理委托 /// 注册事件 /// 取消注册事件 /// 关闭事件 public void Subscribe( Func subscriber, Action handler, EventHandler registered = null, EventHandler unregistered = null, EventHandler shutdown = null) where T : class { var attribute = typeof(T).GetAttribute(); if (attribute == null) throw new ArgumentException("RabbitMqAttribute Is Null!"); Subscribe(attribute, subscriber, handler, registered, unregistered, shutdown); } /// /// 订阅消息 /// /// /// RabbitMq特性配置 /// 消费处理委托 /// 异常处理委托 /// 注册事件 /// 取消注册事件 /// 关闭事件 public void Subscribe( RabbitMqAttribute attribute, Func subscriber, Action handler, EventHandler registered = null, EventHandler unregistered = null, EventHandler shutdown = null) where T : class { //自定义参数 var arguments = new Dictionary(); //设置队列消息过期时间,指整个队列的所有消息 if (attribute.MessageTTL > 0) arguments["x-message-ttl"] = attribute.MessageTTL; //设置队列过期时间 if (attribute.AutoExpire > 0) arguments["x-expires"] = attribute.AutoExpire; //设置队列最大长度 if (attribute.MaxLength > 0) arguments["x-max-length"] = attribute.MaxLength; //设置队列占用最大空间 if (attribute.MaxLengthBytes > 0) arguments["x-max-length-bytes"] = attribute.MaxLengthBytes; //设置队列溢出行为 if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish") arguments["x-overflow"] = attribute.OverflowBehaviour; //设置死信交换机 if (!attribute.DeadLetterExchange.IsNullOrEmpty()) arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange; //设置死信路由键 if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty()) arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey; //设置队列优先级 if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10) arguments["x-max-priority"] = attribute.MaximumPriority; //设置队列惰性模式 if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy") arguments["x-queue-mode"] = attribute.LazyMode; //设置集群配置 if (!attribute.MasterLocator.IsNullOrEmpty()) arguments["x-queue-master-locator"] = attribute.MasterLocator; //订阅消息 Subscribe( attribute.Exchange, attribute.Queue, attribute.RoutingKey, subscriber, handler, attribute.RetryCount, attribute.PrefetchCount, attribute.DeadLetter, attribute.ExchangeType, attribute.Durable, attribute.ConsumerCount, arguments, registered: registered, unregistered: unregistered, shutdown: shutdown); } /// /// 订阅消息 /// /// /// 消费处理委托 /// 异常处理委托 /// 注册事件 /// 取消注册事件 /// 关闭事件 public void Subscribe( Func> subscriber, Func handler, EventHandler registered = null, EventHandler unregistered = null, EventHandler shutdown = null) where T : class { var attribute = typeof(T).GetAttribute(); if (attribute == null) throw new ArgumentException("RabbitMqAttribute Is Null!"); Subscribe(attribute, subscriber, handler, registered, unregistered, shutdown); } /// /// 订阅消息 /// /// /// RabbitMq特性配置 /// 消费处理委托 /// 异常处理委托 /// 注册事件 /// 取消注册事件 /// 关闭事件 public void Subscribe( RabbitMqAttribute attribute, Func> subscriber, Func handler, EventHandler registered = null, EventHandler unregistered = null, EventHandler shutdown = null) where T : class { //自定义参数 var arguments = new Dictionary(); //设置队列消息过期时间,指整个队列的所有消息 if (attribute.MessageTTL > 0) arguments["x-message-ttl"] = attribute.MessageTTL; //设置队列过期时间 if (attribute.AutoExpire > 0) arguments["x-expires"] = attribute.AutoExpire; //设置队列最大长度 if (attribute.MaxLength > 0) arguments["x-max-length"] = attribute.MaxLength; //设置队列占用最大空间 if (attribute.MaxLengthBytes > 0) arguments["x-max-length-bytes"] = attribute.MaxLengthBytes; //设置队列溢出行为 if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish") arguments["x-overflow"] = attribute.OverflowBehaviour; //设置死信交换机 if (!attribute.DeadLetterExchange.IsNullOrEmpty()) arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange; //设置死信路由键 if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty()) arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey; //设置队列优先级 if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10) arguments["x-max-priority"] = attribute.MaximumPriority; //设置队列惰性模式 if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy") arguments["x-queue-mode"] = attribute.LazyMode; //设置集群配置 if (!attribute.MasterLocator.IsNullOrEmpty()) arguments["x-queue-master-locator"] = attribute.MasterLocator; //订阅消息 Subscribe( attribute.Exchange, attribute.Queue, attribute.RoutingKey, subscriber, handler, attribute.RetryCount, attribute.PrefetchCount, attribute.DeadLetter, attribute.ExchangeType, attribute.Durable, attribute.ConsumerCount, arguments, registered: registered, unregistered: unregistered, shutdown: shutdown); } /// /// 订阅消息 /// /// /// 交换机名称 /// 队列名称 /// 路由键 /// 消费处理委托 /// 异常处理委托 /// 重试次数 /// 预取数量 /// 是否进入内置死信队列,此处与原生死信无关 /// 交换机类型 /// 持久化 /// 消费者数量 /// 队列参数 /// 交换机参数 /// 注册事件 /// 取消注册事件 /// 关闭事件 public void Subscribe( string exchange, string queue, string routingKey, Func subscriber, Action handler, int retryCount = 5, ushort prefetchCount = 1, bool deadLetter = true, string exchangeType = ExchangeType.Direct, bool durable = true, int consumerCount = 1, IDictionary queueArguments = null, IDictionary exchangeArguments = null, EventHandler registered = null, EventHandler unregistered = null, EventHandler shutdown = null) where T : class { //获取管道 var channel = GetChannel(queue); //判断交换机是否存在 if (!IsExchangeExist(channel, exchange)) channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments); //判断队列是否存在 if (!IsQueueExist(channel, queue)) channel = QueueDeclare(channel, queue, durable, arguments: queueArguments); //绑定交换机和队列 QueueBind(channel, exchange, queue, routingKey); //设置每次预取数量 channel.BasicQos(0, prefetchCount, false); //根据设置的消费者数量创建消费者 for (int i = 0; i < consumerCount; i++) { //创建消费者 var consumer = new EventingBasicConsumer(channel); //接收消息事件 consumer.Received += (sender, ea) => { var text = Encoding.UTF8.GetString(ea.Body.ToArray()); var body = text; var numberOfRetries = 0; Exception exception = null; bool? result = false; while (numberOfRetries <= retryCount) { try { var msg = body.ToObject(); result = subscriber?.Invoke(msg, ea); if (result == true) channel.BasicAck(ea.DeliveryTag, false); else channel.BasicNack(ea.DeliveryTag, false, false); //异常置空 exception = null; break; } catch (Exception ex) { exception = ex; handler?.Invoke(body, numberOfRetries, ex); numberOfRetries++; } } //重试后异常仍未解决 if (exception != null) channel.BasicNack(ea.DeliveryTag, false, false); //是否进入内置死信队列 if (deadLetter && (!(result == true) || exception != null)) PublishToDead( queue, ea.Exchange, ea.RoutingKey, body, exception == null ? numberOfRetries : numberOfRetries - 1, exception); }; //注册事件 if (registered != null) consumer.Registered += registered; //取消注册事件 if (unregistered != null) consumer.Unregistered += unregistered; //关闭事件 if (shutdown != null) consumer.Shutdown += shutdown; //手动确认 channel.BasicConsume(queue, false, consumer); } } /// /// 订阅消息 /// /// /// 交换机名称 /// 队列名称 /// 路由键 /// 消费处理委托 /// 异常处理委托 /// 重试次数 /// 预取数量 /// 是否进入内置死信队列,此处与原生死信无关 /// 交换机类型 /// 持久化 /// 消费者数量 /// 队列参数 /// 交换机参数 /// 注册事件 /// 取消注册事件 /// 关闭事件 public void Subscribe( string exchange, string queue, string routingKey, Func> subscriber, Func handler, int retryCount = 5, ushort prefetchCount = 1, bool deadLetter = true, string exchangeType = ExchangeType.Direct, bool durable = true, int consumerCount = 1, IDictionary queueArguments = null, IDictionary exchangeArguments = null, EventHandler registered = null, EventHandler unregistered = null, EventHandler shutdown = null) where T : class { //获取管道 var channel = GetChannel(queue); //判断交换机是否存在 if (!IsExchangeExist(channel, exchange)) channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments); //判断队列是否存在 if (!IsQueueExist(channel, queue)) channel = QueueDeclare(channel, queue, durable, arguments: queueArguments); //绑定交换机和队列 QueueBind(channel, exchange, queue, routingKey); //设置每次预取数量 channel.BasicQos(0, prefetchCount, false); //根据设置的消费者数量创建消费者 for (int i = 0; i < consumerCount; i++) { //创建消费者 var consumer = new EventingBasicConsumer(channel); //接收消息事件 consumer.Received += async (sender, ea) => { var text = Encoding.UTF8.GetString(ea.Body.ToArray()); var body = text; var numberOfRetries = 0; Exception exception = null; bool? result = false; while (numberOfRetries <= retryCount) { try { var msg = body.ToObject(); if (subscriber != null) result = await subscriber(msg, ea); if (result == true) channel.BasicAck(ea.DeliveryTag, false); else channel.BasicNack(ea.DeliveryTag, false, false); //异常置空 exception = null; break; } catch (Exception ex) { exception = ex; if (handler != null) await handler(body, numberOfRetries, ex); numberOfRetries++; } } //重试后异常仍未解决 if (exception != null) channel.BasicNack(ea.DeliveryTag, false, false); //是否进入内置死信队列 if (deadLetter && (!(result == true) || exception != null)) PublishToDead( queue, ea.Exchange, ea.RoutingKey, body, exception == null ? numberOfRetries : numberOfRetries - 1, exception); }; //注册事件 if (registered != null) consumer.Registered += registered; //取消注册事件 if (unregistered != null) consumer.Unregistered += unregistered; //关闭事件 if (shutdown != null) consumer.Shutdown += shutdown; //手动确认 channel.BasicConsume(queue, false, consumer); } } #endregion 订阅消息 #region 获取消息 /// /// 获取消息 /// /// /// 消费处理委托 public void Pull( Action handler) where T : class { var attribute = typeof(T).GetAttribute(); if (attribute == null) throw new ArgumentException("RabbitMqAttribute Is Null!"); Pull(attribute.Exchange, attribute.Queue, attribute.RoutingKey, handler); } /// /// 获取消息 /// /// /// 交换机名称 /// 队列名称 /// 路由键 /// 消费处理委托 public void Pull( string exchange, string queue, string routingKey, Action handler) where T : class { //获取管道 var channel = GetChannel(queue); //判断交换机是否存在 if (!IsExchangeExist(channel, exchange)) channel = ExchangeDeclare(channel, exchange); //判断队列是否存在 if (!IsQueueExist(channel, queue)) channel = QueueDeclare(channel, queue); //绑定交换机和队列 QueueBind(channel, exchange, queue, routingKey); var result = channel.BasicGet(queue, false); if (result == null) return; var text = Encoding.UTF8.GetString(result.Body.ToArray()); var msg = text.ToObject(); try { handler(msg, result); } catch (Exception) { throw; } finally { channel.BasicAck(result.DeliveryTag, false); } } /// /// 获取消息 /// /// /// 消费处理委托 public async Task PullAsync( Func handler) where T : class { var attribute = typeof(T).GetAttribute(); if (attribute == null) throw new ArgumentException("RabbitMqAttribute Is Null!"); await PullAsync(attribute.Exchange, attribute.Queue, attribute.RoutingKey, handler); } /// /// 获取消息 /// /// /// 交换机名称 /// 队列名称 /// 路由键 /// 消费处理委托 public async Task PullAsync( string exchange, string queue, string routingKey, Func handler) where T : class { //获取管道 var channel = GetChannel(queue); //判断交换机是否存在 if (!IsExchangeExist(channel, exchange)) channel = ExchangeDeclare(channel, exchange); //判断队列是否存在 if (!IsQueueExist(channel, queue)) channel = QueueDeclare(channel, queue); //绑定交换机和队列 QueueBind(channel, exchange, queue, routingKey); var result = channel.BasicGet(queue, false); if (result == null) return; var text = Encoding.UTF8.GetString(result.Body.ToArray()); var msg = text.ToObject(); try { if (handler != null) await handler(msg, result); } catch (Exception) { throw; } finally { channel.BasicAck(result.DeliveryTag, false); } } /// /// 获取消息数量 /// /// 管道 /// 队列名称 /// public uint GetMessageCount( IModel channel, string queue) { return (channel ?? GetChannel(queue)).MessageCount(queue); } #endregion 获取消息 #region 释放资源 /// /// 执行与释放或重置非托管资源关联的应用程序定义的任务。 /// public void Dispose() { foreach (var item in _channelDic) { item.Value?.Dispose(); } foreach (var item in _queueHelperDic) { item.Value?.Dispose(); } _connection?.Dispose(); _channelDic.Clear(); _queueHelperDic.Clear(); } #endregion 释放资源 } /// /// RabbitMq连接配置 /// public class MqConfig { /// /// 是否启用 /// public bool Enabled { get; set; } /// /// 主机名 /// public string HostName { get; set; } = "localhost"; /// /// 心跳时间 /// public TimeSpan RequestedHeartbeat { get; set; } = TimeSpan.FromSeconds(10); /// /// 自动重连 /// public bool AutomaticRecoveryEnabled { get; set; } = true; /// /// 重连时间 /// public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(10); /// /// 用户名 /// public string UserName { get; set; } = "guest"; /// /// 密码 /// public string Password { get; set; } = "guest"; /// /// 端口号 /// public int Port { get; set; } = 5672; /// /// 虚拟主机 /// public string VirtualHost { get; set; } = "/"; } /// /// 自定义的RabbitMq队列信息实体特性 /// [AttributeUsage(AttributeTargets.Class, AllowMultiple = false)] public class RabbitMqAttribute : Attribute { /// /// 交换机 /// public string Exchange { get; set; } /// /// 交换机类型(广播fanout,队列direct) /// public string ExchangeType { get; set; } /// /// 队列 /// public string Queue { get; set; } /// /// 路由键 /// public string RoutingKey { get; set; } /// /// 是否持久化,默认true /// public bool Durable { get; set; } = true; /// /// 预取数量,默认1 /// public ushort PrefetchCount { get; set; } = 1; /// /// 异常重试次数,默认5次 /// public int RetryCount { get; set; } = 5; /// /// 消息过期时间,过期后队列中消息自动被删除,单位ms /// public int MessageTTL { get; set; } /// /// 消息头部对象json字符串 /// public string Header { get; set; } /// /// 队列消费者数量,默认1 /// public int ConsumerCount { get; set; } = 1; /// /// 队列过期时间,过期后队列自动被删除,单位ms /// public int AutoExpire { get; set; } /// /// 队列最大长度 /// public int MaxLength { get; set; } /// /// 队列占用的最大空间 /// public int MaxLengthBytes { get; set; } /// /// 队列溢出行为,可选值:drop-head或者reject-publish,默认drop-head /// public string OverflowBehaviour { get; set; } /// /// 是否启用系统内置死信队列处理逻辑,默认true;注意:此处与原生的死信无关 /// public bool DeadLetter { get; set; } = true; /// /// 死信交换机 /// public string DeadLetterExchange { get; set; } /// /// 死信路由键 /// public string DeadLetterRoutingKey { get; set; } /// /// 队列最大优先级,数值越大优先级越高,范围1-255,建议取值1-10 /// public int MaximumPriority { get; set; } /// /// 队列惰性模式,可选值:default或者lazy /// public string LazyMode { get; set; } /// /// 主定位器,集群配置 /// public string MasterLocator { get; set; } } /// /// 内置死信队列实体 /// public class DeadLetterQueue { /// /// 消息内容 /// public string Body { get; set; } /// /// 交换机 /// public string Exchange { get; set; } /// /// 队列 /// public string Queue { get; set; } /// /// 路由键 /// public string RoutingKey { get; set; } /// /// 重试次数 /// public int RetryCount { get; set; } /// /// 异常消息 /// public string ExceptionMsg { get; set; } /// /// 异常 /// public Exception Exception { get; set; } /// /// 创建时间 /// public DateTime CreateDateTime { get; set; } = DateTime.Now; } /// /// 内存队列消息 /// public class QueueMessage { /// /// 交换机名称 /// public string Exchange { get; set; } /// /// 队列名称 /// public string Queue { get; set; } /// /// 路由键 /// public string RoutingKey { get; set; } /// /// 消息内容 /// public IEnumerable Body { get; set; } /// /// 交换机类型 /// public string ExchangeType { get; set; } /// /// 持久化 /// public bool Durable { get; set; } /// /// 消息发送确认 /// public bool Confirm { get; set; } /// /// 单个消息过期时间,单位ms /// public string Expiration { get; set; } /// /// 单个消息优先级,数值越大优先级越高,取值范围:0-9 /// public byte? Priority { get; set; } /// /// 队列参数 /// public IDictionary QueueArguments { get; set; } /// /// 交换机参数 /// public IDictionary ExchangeArguments { get; set; } /// /// 消息头部 /// public IDictionary Headers { get; set; } } }