using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Serenity;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace HT.Cloud.Code
{
///
/// RabbitMq工具类
///
public class RabbitMqHelper : IDisposable
{
#region 私有静态字段
///
/// RabbitMQ建议客户端线程之间不要共用Model,至少要保证共用Model的线程发送消息必须是串行的,但是建议尽量共用Connection。
///
private static readonly ConcurrentDictionary _channelDic = new();
///
/// QueueHelper缓存
///
private static readonly ConcurrentDictionary> _queueHelperDic = new();
///
/// 用于缓存路由键数据
///
private static readonly ConcurrentDictionary _routeDic = new();
///
/// RabbitMq连接
///
private static IConnection _connection;
///
/// 线程对象,线程锁使用
///
private static readonly object _locker = new();
#endregion 私有静态字段
#region 公有属性
///
/// 默认内置死信交换机,默认值:deadletter.default.router
///
public string DefaultDeadLetterExchange { get; set; } = "deadletter.default.router";
#endregion 公有属性
#region 构造函数
///
/// 构造函数
///
/// RabbitMq配置
public RabbitMqHelper(MqConfig config)
{
if (config == null)
throw new ArgumentNullException(nameof(config));
if (_connection == null)
{
lock (_locker)
{
if (_connection == null)
_connection = new ConnectionFactory
{
//设置主机名
HostName = config.HostName,
//虚拟主机
VirtualHost = config.VirtualHost,
//设置心跳时间
RequestedHeartbeat = config.RequestedHeartbeat,
//设置自动重连
AutomaticRecoveryEnabled = config.AutomaticRecoveryEnabled,
//重连时间
NetworkRecoveryInterval = config.NetworkRecoveryInterval,
//用户名
UserName = config.UserName,
//密码
Password = config.Password,
//配置端口
Port=config.Port
}.CreateConnection();
}
}
}
///
/// 构造函数
///
/// RabbitMq连接工厂
public RabbitMqHelper(ConnectionFactory factory)
{
if (factory == null)
throw new ArgumentNullException(nameof(factory));
if (_connection == null)
{
lock (_locker)
{
if (_connection == null)
_connection = factory.CreateConnection();
}
}
}
#endregion 构造函数
#region 管道
///
/// 获取管道
///
/// 队列名称
///
public IModel GetChannel(string queue = "default")
{
IModel channel = null;
if (!string.IsNullOrEmpty(queue) && _channelDic.ContainsKey(queue))
channel = _channelDic[queue];
if (!string.IsNullOrEmpty(queue) && channel == null)
{
lock (_locker)
{
channel = _channelDic.GetOrAdd(queue, queue =>
{
var model = _connection.CreateModel();
_channelDic[queue] = model;
return model;
});
}
}
channel ??= _connection.CreateModel();
return EnsureOpened(channel);
}
///
/// 确保管道是已打开状态
///
/// 管道
///
public IModel EnsureOpened(IModel channel)
{
channel ??= GetChannel();
if (channel.IsClosed)
{
var data = _channelDic.Where(x => x.Value == channel)?.FirstOrDefault();
if (data != null && data.Value.Key != null)
{
//移除已关闭的管道
_channelDic.TryRemove(data.Value.Key, out var model);
//重新获取管道
channel = GetChannel(data.Value.Key);
}
else
{
channel = GetChannel();
}
}
return channel;
}
///
/// 如果交换机或者队列不存在则移除缓存中异常关闭的管道
///
///
public void RemoveIfNotExist(IModel channel)
{
var data = _channelDic.Where(x => x.Value == channel)?.FirstOrDefault();
if (data != null && data.Value.Key != null)
{
//移除已关闭的管道
_channelDic.TryRemove(data.Value.Key, out var model);
channel = null;
}
}
///
/// 获取QueueHelper
///
///
///
public QueueHelper GetQueueHelper(string queue)
{
if (!string.IsNullOrEmpty(queue))
queue = "default";
if (_queueHelperDic.ContainsKey(queue))
return _queueHelperDic[queue];
lock (_locker)
{
return _queueHelperDic.GetOrAdd(queue, queue =>
{
var queueHelper = new QueueHelper(x =>
PublishRabbitMqMessage(
x.Exchange,
x.Queue,
x.RoutingKey,
x.Body,
x.ExchangeType,
x.Durable,
x.Confirm,
x.Expiration,
x.Priority,
x.QueueArguments,
x.ExchangeArguments,
x.Headers));
_queueHelperDic[queue] = queueHelper;
return queueHelper;
});
}
}
#endregion 管道
#region 交换机
///
/// 声明交换机,交换机可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
///
/// 管道
/// 交换机名称
/// 交换机类型:
/// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
/// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
/// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
/// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
/// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
/// 交换机转发消息是最快的。
/// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
/// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
/// 只会匹配到“audit.irs”。
/// 持久化
/// 自动删除
/// 参数
/// 管道
public IModel ExchangeDeclare(
IModel channel,
string exchange,
string exchangeType = ExchangeType.Direct,
bool durable = true,
bool autoDelete = false,
IDictionary arguments = null)
{
channel = EnsureOpened(channel);
channel.ExchangeDeclare(exchange, exchangeType, durable, autoDelete, arguments);
return channel;
}
///
/// 声明交换机,交换机可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
///
/// 管道
/// 交换机名称
/// 交换机类型:
/// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
/// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
/// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
/// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
/// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
/// 交换机转发消息是最快的。
/// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
/// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
/// 只会匹配到“audit.irs”。
/// 持久化
/// 自动删除
/// 参数
/// 管道
public IModel ExchangeDeclareNoWait(
IModel channel,
string exchange,
string exchangeType = ExchangeType.Direct,
bool durable = true,
bool autoDelete = false,
IDictionary arguments = null)
{
channel = EnsureOpened(channel);
channel.ExchangeDeclareNoWait(exchange, exchangeType, durable, autoDelete, arguments);
return channel;
}
///
/// 被动声明交换机,用于判断交换机是否存在,若存在则无异常,若不存在则抛出异常
///
/// 管道
/// 交换机名称
public void ExchangeDeclarePassive(IModel channel, string exchange)
{
EnsureOpened(channel).ExchangeDeclarePassive(exchange);
}
///
/// 判断交换机是否存在
///
/// 管道
/// 交换机名称
///
public bool IsExchangeExist(IModel channel, string exchange)
{
try
{
ExchangeDeclarePassive(channel, exchange);
return true;
}
catch
{
RemoveIfNotExist(channel);
return false;
}
}
///
/// 删除交换机
///
/// 管道
/// 交换机名称
/// 是否没有被使用
public void ExchangeDelete(
IModel channel,
string exchange,
bool ifUnused = false)
{
RouteKeyRemoveByExchange(exchange);
EnsureOpened(channel).ExchangeDelete(exchange, ifUnused);
}
///
/// 删除交换机
///
/// 管道
/// 交换机名称
/// 是否没有被使用
public void ExchangeDeleteNoWait(
IModel channel,
string exchange,
bool ifUnused = false)
{
RouteKeyRemoveByExchange(exchange);
EnsureOpened(channel).ExchangeDeleteNoWait(exchange, ifUnused);
}
///
/// 绑定交换机
///
/// 管道
/// 目标交换机
/// 源交换机
/// 路由键
/// 参数
public void ExchangeBind(
IModel channel,
string destinationExchange,
string sourceExchange,
string routingKey,
IDictionary arguments = null)
{
EnsureOpened(channel).ExchangeBind(destinationExchange, sourceExchange, routingKey, arguments);
}
///
/// 绑定交换机
///
/// 管道
/// 目标交换机
/// 源交换机
/// 路由键
/// 参数
public void ExchangeBindNoWait(
IModel channel,
string destinationExchange,
string sourceExchange,
string routingKey,
IDictionary arguments = null)
{
EnsureOpened(channel).ExchangeBindNoWait(destinationExchange, sourceExchange, routingKey, arguments);
}
///
/// 解绑交换机
///
/// 管道
/// 目标交换机
/// 源交换机
/// 路由键
/// 参数
public void ExchangeUnbind(
IModel channel,
string destinationExchange,
string sourceExchange,
string routingKey,
IDictionary arguments = null)
{
EnsureOpened(channel).ExchangeUnbind(destinationExchange, sourceExchange, routingKey, arguments);
}
///
/// 解绑交换机
///
/// 管道
/// 目标交换机
/// 源交换机
/// 路由键
/// 参数
public void ExchangeUnbindNoWait(
IModel channel,
string destinationExchange,
string sourceExchange,
string routingKey,
IDictionary arguments = null)
{
EnsureOpened(channel).ExchangeUnbindNoWait(destinationExchange, sourceExchange, routingKey, arguments);
}
#endregion 交换机
#region 队列
///
/// 声明队列,队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
///
/// 管道
/// 队列名称
/// 持久化
/// 排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
/// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
/// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
/// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
/// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
/// 自动删除
/// 参数
/// 管道
public IModel QueueDeclare(
IModel channel,
string queue,
bool durable = true,
bool exclusive = false,
bool autoDelete = false,
IDictionary arguments = null)
{
channel = EnsureOpened(channel);
channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
return channel;
}
///
/// 声明队列,队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
///
/// 管道
/// 队列名称
/// 持久化
/// 排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
/// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
/// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
/// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
/// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
/// 自动删除
/// 参数
/// 管道
public IModel QueueDeclareNoWait(
IModel channel,
string queue,
bool durable = true,
bool exclusive = false,
bool autoDelete = false,
IDictionary arguments = null)
{
channel = EnsureOpened(channel);
channel.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
return channel;
}
///
/// 被动声明队列,用于判断队列是否存在,若队列存在则无异常,若不存在则抛异常
///
/// 管道
/// 队列名称
public void QueueDeclarePassive(IModel channel, string queue)
{
EnsureOpened(channel).QueueDeclarePassive(queue);
}
///
/// 判断队列是否存在
///
/// 管道
/// 队列名称
///
public bool IsQueueExist(IModel channel, string queue)
{
try
{
QueueDeclarePassive(channel, queue);
return true;
}
catch
{
RemoveIfNotExist(channel);
return false;
}
}
///
/// 删除队列
///
/// 管道
/// 队列名称
/// 是否没有被使用
/// 是否为空
///
public uint QueueDelete(
IModel channel,
string queue,
bool ifUnused = false,
bool ifEmpty = false)
{
RouteKeyRemoveByQueue(queue);
return EnsureOpened(channel).QueueDelete(queue, ifUnused, ifEmpty);
}
///
/// 删除队列
///
/// 管道
/// 队列名称
/// 是否没有被使用
/// 是否为空
public void QueueDeleteNoWait(
IModel channel,
string queue,
bool ifUnused = false,
bool ifEmpty = false)
{
RouteKeyRemoveByQueue(queue);
EnsureOpened(channel).QueueDeleteNoWait(queue, ifUnused, ifEmpty);
}
///
/// 绑定队列
///
/// 管道
/// 交换机名称
/// 队列名称
/// 路由键
/// 参数
public void QueueBind(
IModel channel,
string exchange,
string queue,
string routingKey,
IDictionary arguments = null)
{
if (IsRouteKeyExist(exchange, queue, routingKey))
return;
EnsureOpened(channel).QueueBind(queue, exchange, routingKey, arguments);
}
///
/// 绑定队列
///
/// 管道
/// 交换机名称
/// 队列名称
/// 路由键
/// 参数
public void QueueBindNoWait(
IModel channel,
string exchange,
string queue,
string routingKey,
IDictionary arguments = null)
{
if (IsRouteKeyExist(exchange, queue, routingKey))
return;
EnsureOpened(channel).QueueBindNoWait(queue, exchange, routingKey, arguments);
}
///
/// 解绑队列
///
/// 管道
/// 交换机名称
/// 队列名称
/// 路由键
/// 参数
public void QueueUnbind(
IModel channel,
string exchange,
string queue,
string routingKey,
IDictionary arguments = null)
{
RouteKeyRemove(exchange, queue, routingKey);
EnsureOpened(channel).QueueUnbind(queue, exchange, routingKey, arguments);
}
///
/// 清除队列
///
/// 管道
/// 队列名称
public void QueuePurge(IModel channel, string queue)
{
EnsureOpened(channel).QueuePurge(queue);
}
#endregion 队列
#region 路由
///
/// 判断路由键是否存在,仅在内存中作判断
///
/// 交换机
/// 队列
/// 路由键
///
public bool IsRouteKeyExist(string exchange, string queue, string routeKey)
{
var key = $"{exchange}.{queue}.{routeKey}";
if (_routeDic.ContainsKey(key))
return true;
else
{
lock (_locker)
{
if (!_routeDic.ContainsKey(key))
{
_routeDic.TryAdd(key, (exchange, queue, routeKey));
return false;
}
return true;
}
}
}
///
/// 移除路由键缓存
///
///
///
///
public void RouteKeyRemove(string exchange, string queue, string routeKey)
{
var key = $"{exchange}.{queue}.{routeKey}";
if (_routeDic.ContainsKey(key))
{
lock (_locker)
{
if (_routeDic.ContainsKey(key))
_routeDic.TryRemove(key, out var _);
}
}
}
///
/// 移除路由键缓存
///
/// 队列
public void RouteKeyRemoveByQueue(string queue)
{
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.queue == queue))
{
lock (_locker)
{
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.queue == queue))
{
var keys = _routeDic.Keys.Where(x => x.Split('.')[1] == queue);
foreach (var key in keys)
{
_routeDic.TryRemove(key, out var _);
}
}
}
}
}
///
/// 移除路由键缓存
///
/// 交换机
public void RouteKeyRemoveByExchange(string exchange)
{
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.exchange == exchange))
{
lock (_locker)
{
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.exchange == exchange))
{
var keys = _routeDic.Keys.Where(x => x.Split('.')[0] == exchange);
foreach (var key in keys)
{
_routeDic.TryRemove(key, out var _);
}
}
}
}
}
#endregion 路由
#region 发布消息
///
/// 发布消息
///
/// 消息指令
/// 消息发送确认
/// 单个消息过期时间,单位ms
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
///
public bool Publish(
T command,
bool confirm = false,
string expiration = null,
byte? priority = null)
where T : class
{
var attribute = typeof(T).GetAttribute();
if (attribute == null)
throw new ArgumentException("RabbitMqAttribute Is Null!");
return Publish(attribute, command, confirm, expiration, priority);
}
///
/// 发布消息
///
/// RabbitMq特性配置
/// 消息指令
/// 消息发送确认
/// 单个消息过期时间,单位ms
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
///
public bool Publish(
RabbitMqAttribute attribute,
T command,
bool confirm = false,
string expiration = null,
byte? priority = null)
where T : class
{
//消息内容
var body = command.ToJson();
//自定义队列参数
var arguments = new Dictionary();
//设置队列消息过期时间,指整个队列的所有消息
if (attribute.MessageTTL > 0)
arguments["x-message-ttl"] = attribute.MessageTTL;
//设置队列过期时间
if (attribute.AutoExpire > 0)
arguments["x-expires"] = attribute.AutoExpire;
//设置队列最大长度
if (attribute.MaxLength > 0)
arguments["x-max-length"] = attribute.MaxLength;
//设置队列占用最大空间
if (attribute.MaxLengthBytes > 0)
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
//设置队列溢出行为
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
arguments["x-overflow"] = attribute.OverflowBehaviour;
//设置死信交换机
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
//设置死信路由键
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
//设置队列优先级
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
arguments["x-max-priority"] = attribute.MaximumPriority;
//设置队列惰性模式
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
arguments["x-queue-mode"] = attribute.LazyMode;
//设置集群配置
if (!attribute.MasterLocator.IsNullOrEmpty())
arguments["x-queue-master-locator"] = attribute.MasterLocator;
//发送消息
return Publish(
attribute.Exchange,
attribute.Queue,
attribute.RoutingKey,
body,
attribute.ExchangeType,
attribute.Durable,
confirm,
expiration,
priority,
arguments,
null,
attribute.Header?.ToObject>());
}
///
/// 发布消息
///
/// 消息指令
/// 消息发送确认
/// 单个消息过期时间,单位ms
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
///
public bool Publish(
IEnumerable command,
bool confirm = false,
string expiration = null,
byte? priority = null)
where T : class
{
var attribute = typeof(T).GetAttribute();
if (attribute == null)
throw new ArgumentException("RabbitMqAttribute Is Null!");
return Publish(attribute, command, confirm, expiration, priority);
}
///
/// 发布消息
///
/// RabbitMq特性配置
/// 消息指令
/// 消息发送确认
/// 单个消息过期时间,单位ms
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
///
public bool Publish(
RabbitMqAttribute attribute,
IEnumerable command,
bool confirm = false,
string expiration = null,
byte? priority = null)
where T : class
{
//消息内容
var body = command.Select(x => x.ToJson());
//自定义队列参数
var arguments = new Dictionary();
//设置队列消息过期时间,指整个队列的所有消息
if (attribute.MessageTTL > 0)
arguments["x-message-ttl"] = attribute.MessageTTL;
//设置队列过期时间
if (attribute.AutoExpire > 0)
arguments["x-expires"] = attribute.AutoExpire;
//设置队列最大长度
if (attribute.MaxLength > 0)
arguments["x-max-length"] = attribute.MaxLength;
//设置队列占用最大空间
if (attribute.MaxLengthBytes > 0)
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
//设置队列溢出行为
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
arguments["x-overflow"] = attribute.OverflowBehaviour;
//设置死信交换机
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
//设置死信路由键
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
//设置队列优先级
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
arguments["x-max-priority"] = attribute.MaximumPriority;
//设置队列惰性模式
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
arguments["x-queue-mode"] = attribute.LazyMode;
//设置集群配置
if (!attribute.MasterLocator.IsNullOrEmpty())
arguments["x-queue-master-locator"] = attribute.MasterLocator;
//发送消息
return Publish(
attribute.Exchange,
attribute.Queue,
attribute.RoutingKey,
body,
attribute.ExchangeType,
attribute.Durable,
confirm,
expiration,
priority,
arguments,
null,
attribute.Header?.ToObject>());
}
///
/// 发布消息
///
/// 交换机名称
/// 队列名称
/// 路由键
/// 消息内容
/// 交换机类型
/// 持久化
/// 消息发送确认
/// 单个消息过期时间,单位ms
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
/// 队列参数
/// 交换机参数
/// 消息头部
///
public bool Publish(
string exchange,
string queue,
string routingKey,
string body,
string exchangeType = ExchangeType.Direct,
bool durable = true,
bool confirm = false,
string expiration = null,
byte? priority = null,
IDictionary queueArguments = null,
IDictionary exchangeArguments = null,
IDictionary headers = null)
{
return Publish(
exchange,
queue,
routingKey,
new[] { body },
exchangeType,
durable,
confirm,
expiration,
priority,
queueArguments,
exchangeArguments,
headers);
}
///
/// 发布消息
///
/// 交换机名称
/// 队列名称
/// 路由键
/// 消息内容
/// 交换机类型
/// 持久化
/// 消息发送确认
/// 单个消息过期时间,单位ms
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
/// 队列参数
/// 交换机参数
/// 消息头部
///
public bool Publish(
string exchange,
string queue,
string routingKey,
IEnumerable body,
string exchangeType = ExchangeType.Direct,
bool durable = true,
bool confirm = false,
string expiration = null,
byte? priority = null,
IDictionary queueArguments = null,
IDictionary exchangeArguments = null,
IDictionary headers = null)
{
return GetQueueHelper(queue).Enqueue(
new QueueMessage
{
Exchange = exchange,
Queue = queue,
RoutingKey = routingKey,
Body = body,
ExchangeType = exchangeType,
Durable = durable,
Confirm = confirm,
Expiration = expiration,
Priority = priority,
QueueArguments = queueArguments,
ExchangeArguments = exchangeArguments,
Headers = headers
});
}
///
/// 发布消息到RabbitMq,注意此方法使用IModel线程不安全,建议使用Publish方法,若要使用需要保证单线程调用
///
/// 交换机名称
/// 队列名称
/// 路由键
/// 消息内容
/// 交换机类型
/// 持久化
/// 消息发送确认
/// 单个消息过期时间,单位ms
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
/// 队列参数
/// 交换机参数
/// 消息头部
///
public bool PublishRabbitMqMessage(
string exchange,
string queue,
string routingKey,
IEnumerable body,
string exchangeType = ExchangeType.Direct,
bool durable = true,
bool confirm = false,
string expiration = null,
byte? priority = null,
IDictionary queueArguments = null,
IDictionary exchangeArguments = null,
IDictionary headers = null)
{
//获取管道
var channel = GetChannel(queue);
//判断交换机是否存在
if (!IsExchangeExist(channel, exchange))
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
//判断队列是否存在
if (!IsQueueExist(channel, queue))
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
//绑定交换机和队列
QueueBind(channel, exchange, queue, routingKey);
//声明消息属性
var props = channel.CreateBasicProperties();
//持久化
props.Persistent = durable;
//单个消息过期时间
if (!expiration.IsNullOrEmpty())
props.Expiration = expiration;
//单个消息优先级
if (priority >= 0 && priority <= 9)
props.Priority = priority.Value;
//消息头部
if (headers != null)
props.Headers = headers;
//是否启用消息发送确认机制
if (confirm)
channel.ConfirmSelect();
//发送消息
foreach (var item in body)
{
var text = string.IsNullOrEmpty(item) ? null : Encoding.UTF8.GetBytes(item);
channel.BasicPublish(exchange, routingKey, props, text);
}
//消息发送失败处理
if (confirm && !channel.WaitForConfirms())
return false;
return true;
}
///
/// 发布消息到内置死信队列
///
/// 队列名称
/// 交换机名称
/// 路由键
/// 消息内容
/// 重试次数
/// 异常
///
private bool PublishToDead(
string queue,
string exchange,
string routingKey,
string body,
int retryCount,
Exception exception)
{
//内置死信队列、交换机、路由键
var deadLetterQueue = $"{queue.ToLower()}.{(exception != null ? "error" : "fail")}";
var deadLetterExchange = DefaultDeadLetterExchange;
var deadLetterRoutingKey = $"{queue.ToLower()}.deadletter";
//内置死信队列内容
var deadLetterBody = new DeadLetterQueue
{
Body = body,
CreateDateTime = DateTime.Now,
Exception = exception,
ExceptionMsg = exception?.Message,
Queue = queue,
RoutingKey = routingKey,
Exchange = exchange,
RetryCount = retryCount
};
//发送内置死信消息,注意此处与原生的死信消息无关
return Publish(deadLetterExchange, deadLetterQueue, deadLetterRoutingKey, deadLetterBody.ToJson());
}
#endregion 发布消息
#region 订阅消息
///
/// 订阅消息
///
///
/// 消费处理委托
/// 异常处理委托
/// 注册事件
/// 取消注册事件
/// 关闭事件
public void Subscribe(
Func subscriber,
Action handler,
EventHandler registered = null,
EventHandler unregistered = null,
EventHandler shutdown = null)
where T : class
{
var attribute = typeof(T).GetAttribute();
if (attribute == null)
throw new ArgumentException("RabbitMqAttribute Is Null!");
Subscribe(attribute, subscriber, handler, registered, unregistered, shutdown);
}
///
/// 订阅消息
///
///
/// RabbitMq特性配置
/// 消费处理委托
/// 异常处理委托
/// 注册事件
/// 取消注册事件
/// 关闭事件
public void Subscribe(
RabbitMqAttribute attribute,
Func subscriber,
Action handler,
EventHandler registered = null,
EventHandler unregistered = null,
EventHandler shutdown = null)
where T : class
{
//自定义参数
var arguments = new Dictionary();
//设置队列消息过期时间,指整个队列的所有消息
if (attribute.MessageTTL > 0)
arguments["x-message-ttl"] = attribute.MessageTTL;
//设置队列过期时间
if (attribute.AutoExpire > 0)
arguments["x-expires"] = attribute.AutoExpire;
//设置队列最大长度
if (attribute.MaxLength > 0)
arguments["x-max-length"] = attribute.MaxLength;
//设置队列占用最大空间
if (attribute.MaxLengthBytes > 0)
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
//设置队列溢出行为
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
arguments["x-overflow"] = attribute.OverflowBehaviour;
//设置死信交换机
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
//设置死信路由键
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
//设置队列优先级
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
arguments["x-max-priority"] = attribute.MaximumPriority;
//设置队列惰性模式
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
arguments["x-queue-mode"] = attribute.LazyMode;
//设置集群配置
if (!attribute.MasterLocator.IsNullOrEmpty())
arguments["x-queue-master-locator"] = attribute.MasterLocator;
//订阅消息
Subscribe(
attribute.Exchange,
attribute.Queue,
attribute.RoutingKey,
subscriber,
handler,
attribute.RetryCount,
attribute.PrefetchCount,
attribute.DeadLetter,
attribute.ExchangeType,
attribute.Durable,
attribute.ConsumerCount,
arguments,
registered: registered,
unregistered: unregistered,
shutdown: shutdown);
}
///
/// 订阅消息
///
///
/// 消费处理委托
/// 异常处理委托
/// 注册事件
/// 取消注册事件
/// 关闭事件
public void Subscribe(
Func> subscriber,
Func handler,
EventHandler registered = null,
EventHandler unregistered = null,
EventHandler shutdown = null)
where T : class
{
var attribute = typeof(T).GetAttribute();
if (attribute == null)
throw new ArgumentException("RabbitMqAttribute Is Null!");
Subscribe(attribute, subscriber, handler, registered, unregistered, shutdown);
}
///
/// 订阅消息
///
///
/// RabbitMq特性配置
/// 消费处理委托
/// 异常处理委托
/// 注册事件
/// 取消注册事件
/// 关闭事件
public void Subscribe(
RabbitMqAttribute attribute,
Func> subscriber,
Func handler,
EventHandler registered = null,
EventHandler unregistered = null,
EventHandler shutdown = null)
where T : class
{
//自定义参数
var arguments = new Dictionary();
//设置队列消息过期时间,指整个队列的所有消息
if (attribute.MessageTTL > 0)
arguments["x-message-ttl"] = attribute.MessageTTL;
//设置队列过期时间
if (attribute.AutoExpire > 0)
arguments["x-expires"] = attribute.AutoExpire;
//设置队列最大长度
if (attribute.MaxLength > 0)
arguments["x-max-length"] = attribute.MaxLength;
//设置队列占用最大空间
if (attribute.MaxLengthBytes > 0)
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
//设置队列溢出行为
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
arguments["x-overflow"] = attribute.OverflowBehaviour;
//设置死信交换机
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
//设置死信路由键
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
//设置队列优先级
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
arguments["x-max-priority"] = attribute.MaximumPriority;
//设置队列惰性模式
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
arguments["x-queue-mode"] = attribute.LazyMode;
//设置集群配置
if (!attribute.MasterLocator.IsNullOrEmpty())
arguments["x-queue-master-locator"] = attribute.MasterLocator;
//订阅消息
Subscribe(
attribute.Exchange,
attribute.Queue,
attribute.RoutingKey,
subscriber,
handler,
attribute.RetryCount,
attribute.PrefetchCount,
attribute.DeadLetter,
attribute.ExchangeType,
attribute.Durable,
attribute.ConsumerCount,
arguments,
registered: registered,
unregistered: unregistered,
shutdown: shutdown);
}
///
/// 订阅消息
///
///
/// 交换机名称
/// 队列名称
/// 路由键
/// 消费处理委托
/// 异常处理委托
/// 重试次数
/// 预取数量
/// 是否进入内置死信队列,此处与原生死信无关
/// 交换机类型
/// 持久化
/// 消费者数量
/// 队列参数
/// 交换机参数
/// 注册事件
/// 取消注册事件
/// 关闭事件
public void Subscribe(
string exchange,
string queue,
string routingKey,
Func subscriber,
Action handler,
int retryCount = 5,
ushort prefetchCount = 1,
bool deadLetter = true,
string exchangeType = ExchangeType.Direct,
bool durable = true,
int consumerCount = 1,
IDictionary queueArguments = null,
IDictionary exchangeArguments = null,
EventHandler registered = null,
EventHandler unregistered = null,
EventHandler shutdown = null)
where T : class
{
//获取管道
var channel = GetChannel(queue);
//判断交换机是否存在
if (!IsExchangeExist(channel, exchange))
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
//判断队列是否存在
if (!IsQueueExist(channel, queue))
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
//绑定交换机和队列
QueueBind(channel, exchange, queue, routingKey);
//设置每次预取数量
channel.BasicQos(0, prefetchCount, false);
//根据设置的消费者数量创建消费者
for (int i = 0; i < consumerCount; i++)
{
//创建消费者
var consumer = new EventingBasicConsumer(channel);
//接收消息事件
consumer.Received += (sender, ea) =>
{
var text = Encoding.UTF8.GetString(ea.Body.ToArray());
var body = text;
var numberOfRetries = 0;
Exception exception = null;
bool? result = false;
while (numberOfRetries <= retryCount)
{
try
{
var msg = body.ToObject();
result = subscriber?.Invoke(msg, ea);
if (result == true)
channel.BasicAck(ea.DeliveryTag, false);
else
channel.BasicNack(ea.DeliveryTag, false, false);
//异常置空
exception = null;
break;
}
catch (Exception ex)
{
exception = ex;
handler?.Invoke(body, numberOfRetries, ex);
numberOfRetries++;
}
}
//重试后异常仍未解决
if (exception != null)
channel.BasicNack(ea.DeliveryTag, false, false);
//是否进入内置死信队列
if (deadLetter && (!(result == true) || exception != null))
PublishToDead(
queue,
ea.Exchange,
ea.RoutingKey,
body,
exception == null ? numberOfRetries : numberOfRetries - 1,
exception);
};
//注册事件
if (registered != null)
consumer.Registered += registered;
//取消注册事件
if (unregistered != null)
consumer.Unregistered += unregistered;
//关闭事件
if (shutdown != null)
consumer.Shutdown += shutdown;
//手动确认
channel.BasicConsume(queue, false, consumer);
}
}
///
/// 订阅消息
///
///
/// 交换机名称
/// 队列名称
/// 路由键
/// 消费处理委托
/// 异常处理委托
/// 重试次数
/// 预取数量
/// 是否进入内置死信队列,此处与原生死信无关
/// 交换机类型
/// 持久化
/// 消费者数量
/// 队列参数
/// 交换机参数
/// 注册事件
/// 取消注册事件
/// 关闭事件
public void Subscribe(
string exchange,
string queue,
string routingKey,
Func> subscriber,
Func handler,
int retryCount = 5,
ushort prefetchCount = 1,
bool deadLetter = true,
string exchangeType = ExchangeType.Direct,
bool durable = true,
int consumerCount = 1,
IDictionary queueArguments = null,
IDictionary exchangeArguments = null,
EventHandler registered = null,
EventHandler unregistered = null,
EventHandler shutdown = null)
where T : class
{
//获取管道
var channel = GetChannel(queue);
//判断交换机是否存在
if (!IsExchangeExist(channel, exchange))
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
//判断队列是否存在
if (!IsQueueExist(channel, queue))
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
//绑定交换机和队列
QueueBind(channel, exchange, queue, routingKey);
//设置每次预取数量
channel.BasicQos(0, prefetchCount, false);
//根据设置的消费者数量创建消费者
for (int i = 0; i < consumerCount; i++)
{
//创建消费者
var consumer = new EventingBasicConsumer(channel);
//接收消息事件
consumer.Received += async (sender, ea) =>
{
var text = Encoding.UTF8.GetString(ea.Body.ToArray());
var body = text;
var numberOfRetries = 0;
Exception exception = null;
bool? result = false;
while (numberOfRetries <= retryCount)
{
try
{
var msg = body.ToObject();
if (subscriber != null)
result = await subscriber(msg, ea);
if (result == true)
channel.BasicAck(ea.DeliveryTag, false);
else
channel.BasicNack(ea.DeliveryTag, false, false);
//异常置空
exception = null;
break;
}
catch (Exception ex)
{
exception = ex;
if (handler != null)
await handler(body, numberOfRetries, ex);
numberOfRetries++;
}
}
//重试后异常仍未解决
if (exception != null)
channel.BasicNack(ea.DeliveryTag, false, false);
//是否进入内置死信队列
if (deadLetter && (!(result == true) || exception != null))
PublishToDead(
queue,
ea.Exchange,
ea.RoutingKey,
body,
exception == null ? numberOfRetries : numberOfRetries - 1,
exception);
};
//注册事件
if (registered != null)
consumer.Registered += registered;
//取消注册事件
if (unregistered != null)
consumer.Unregistered += unregistered;
//关闭事件
if (shutdown != null)
consumer.Shutdown += shutdown;
//手动确认
channel.BasicConsume(queue, false, consumer);
}
}
#endregion 订阅消息
#region 获取消息
///
/// 获取消息
///
///
/// 消费处理委托
public void Pull(
Action handler)
where T : class
{
var attribute = typeof(T).GetAttribute();
if (attribute == null)
throw new ArgumentException("RabbitMqAttribute Is Null!");
Pull(attribute.Exchange, attribute.Queue, attribute.RoutingKey, handler);
}
///
/// 获取消息
///
///
/// 交换机名称
/// 队列名称
/// 路由键
/// 消费处理委托
public void Pull(
string exchange,
string queue,
string routingKey,
Action handler)
where T : class
{
//获取管道
var channel = GetChannel(queue);
//判断交换机是否存在
if (!IsExchangeExist(channel, exchange))
channel = ExchangeDeclare(channel, exchange);
//判断队列是否存在
if (!IsQueueExist(channel, queue))
channel = QueueDeclare(channel, queue);
//绑定交换机和队列
QueueBind(channel, exchange, queue, routingKey);
var result = channel.BasicGet(queue, false);
if (result == null)
return;
var text = Encoding.UTF8.GetString(result.Body.ToArray());
var msg = text.ToObject();
try
{
handler(msg, result);
}
catch (Exception)
{
throw;
}
finally
{
channel.BasicAck(result.DeliveryTag, false);
}
}
///
/// 获取消息
///
///
/// 消费处理委托
public async Task PullAsync(
Func handler)
where T : class
{
var attribute = typeof(T).GetAttribute();
if (attribute == null)
throw new ArgumentException("RabbitMqAttribute Is Null!");
await PullAsync(attribute.Exchange, attribute.Queue, attribute.RoutingKey, handler);
}
///
/// 获取消息
///
///
/// 交换机名称
/// 队列名称
/// 路由键
/// 消费处理委托
public async Task PullAsync(
string exchange,
string queue,
string routingKey,
Func handler)
where T : class
{
//获取管道
var channel = GetChannel(queue);
//判断交换机是否存在
if (!IsExchangeExist(channel, exchange))
channel = ExchangeDeclare(channel, exchange);
//判断队列是否存在
if (!IsQueueExist(channel, queue))
channel = QueueDeclare(channel, queue);
//绑定交换机和队列
QueueBind(channel, exchange, queue, routingKey);
var result = channel.BasicGet(queue, false);
if (result == null)
return;
var text = Encoding.UTF8.GetString(result.Body.ToArray());
var msg = text.ToObject();
try
{
if (handler != null)
await handler(msg, result);
}
catch (Exception)
{
throw;
}
finally
{
channel.BasicAck(result.DeliveryTag, false);
}
}
///
/// 获取消息数量
///
/// 管道
/// 队列名称
///
public uint GetMessageCount(
IModel channel,
string queue)
{
return (channel ?? GetChannel(queue)).MessageCount(queue);
}
#endregion 获取消息
#region 释放资源
///
/// 执行与释放或重置非托管资源关联的应用程序定义的任务。
///
public void Dispose()
{
foreach (var item in _channelDic)
{
item.Value?.Dispose();
}
foreach (var item in _queueHelperDic)
{
item.Value?.Dispose();
}
_connection?.Dispose();
_channelDic.Clear();
_queueHelperDic.Clear();
}
#endregion 释放资源
}
///
/// RabbitMq连接配置
///
public class MqConfig
{
///
/// 是否启用
///
public bool Enabled { get; set; }
///
/// 主机名
///
public string HostName { get; set; } = "localhost";
///
/// 心跳时间
///
public TimeSpan RequestedHeartbeat { get; set; } = TimeSpan.FromSeconds(10);
///
/// 自动重连
///
public bool AutomaticRecoveryEnabled { get; set; } = true;
///
/// 重连时间
///
public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(10);
///
/// 用户名
///
public string UserName { get; set; } = "guest";
///
/// 密码
///
public string Password { get; set; } = "guest";
///
/// 端口号
///
public int Port { get; set; } = 5672;
///
/// 虚拟主机
///
public string VirtualHost { get; set; } = "/";
}
///
/// 自定义的RabbitMq队列信息实体特性
///
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public class RabbitMqAttribute : Attribute
{
///
/// 交换机
///
public string Exchange { get; set; }
///
/// 交换机类型(广播fanout,队列direct)
///
public string ExchangeType { get; set; }
///
/// 队列
///
public string Queue { get; set; }
///
/// 路由键
///
public string RoutingKey { get; set; }
///
/// 是否持久化,默认true
///
public bool Durable { get; set; } = true;
///
/// 预取数量,默认1
///
public ushort PrefetchCount { get; set; } = 1;
///
/// 异常重试次数,默认5次
///
public int RetryCount { get; set; } = 5;
///
/// 消息过期时间,过期后队列中消息自动被删除,单位ms
///
public int MessageTTL { get; set; }
///
/// 消息头部对象json字符串
///
public string Header { get; set; }
///
/// 队列消费者数量,默认1
///
public int ConsumerCount { get; set; } = 1;
///
/// 队列过期时间,过期后队列自动被删除,单位ms
///
public int AutoExpire { get; set; }
///
/// 队列最大长度
///
public int MaxLength { get; set; }
///
/// 队列占用的最大空间
///
public int MaxLengthBytes { get; set; }
///
/// 队列溢出行为,可选值:drop-head或者reject-publish,默认drop-head
///
public string OverflowBehaviour { get; set; }
///
/// 是否启用系统内置死信队列处理逻辑,默认true;注意:此处与原生的死信无关
///
public bool DeadLetter { get; set; } = true;
///
/// 死信交换机
///
public string DeadLetterExchange { get; set; }
///
/// 死信路由键
///
public string DeadLetterRoutingKey { get; set; }
///
/// 队列最大优先级,数值越大优先级越高,范围1-255,建议取值1-10
///
public int MaximumPriority { get; set; }
///
/// 队列惰性模式,可选值:default或者lazy
///
public string LazyMode { get; set; }
///
/// 主定位器,集群配置
///
public string MasterLocator { get; set; }
}
///
/// 内置死信队列实体
///
public class DeadLetterQueue
{
///
/// 消息内容
///
public string Body { get; set; }
///
/// 交换机
///
public string Exchange { get; set; }
///
/// 队列
///
public string Queue { get; set; }
///
/// 路由键
///
public string RoutingKey { get; set; }
///
/// 重试次数
///
public int RetryCount { get; set; }
///
/// 异常消息
///
public string ExceptionMsg { get; set; }
///
/// 异常
///
public Exception Exception { get; set; }
///
/// 创建时间
///
public DateTime CreateDateTime { get; set; } = DateTime.Now;
}
///
/// 内存队列消息
///
public class QueueMessage
{
///
/// 交换机名称
///
public string Exchange { get; set; }
///
/// 队列名称
///
public string Queue { get; set; }
///
/// 路由键
///
public string RoutingKey { get; set; }
///
/// 消息内容
///
public IEnumerable Body { get; set; }
///
/// 交换机类型
///
public string ExchangeType { get; set; }
///
/// 持久化
///
public bool Durable { get; set; }
///
/// 消息发送确认
///
public bool Confirm { get; set; }
///
/// 单个消息过期时间,单位ms
///
public string Expiration { get; set; }
///
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
///
public byte? Priority { get; set; }
///
/// 队列参数
///
public IDictionary QueueArguments { get; set; }
///
/// 交换机参数
///
public IDictionary ExchangeArguments { get; set; }
///
/// 消息头部
///
public IDictionary Headers { get; set; }
}
}