2066 lines
60 KiB
C#
2066 lines
60 KiB
C#
![]() |
using RabbitMQ.Client;
|
|||
|
using RabbitMQ.Client.Events;
|
|||
|
using Serenity;
|
|||
|
using System;
|
|||
|
using System.Collections.Concurrent;
|
|||
|
using System.Collections.Generic;
|
|||
|
using System.Linq;
|
|||
|
using System.Text;
|
|||
|
using System.Threading.Tasks;
|
|||
|
|
|||
|
namespace HT.Cloud.Code
|
|||
|
{
|
|||
|
/// <summary>
|
|||
|
/// RabbitMq工具类
|
|||
|
/// </summary>
|
|||
|
public class RabbitMqHelper : IDisposable
|
|||
|
{
|
|||
|
#region 私有静态字段
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// RabbitMQ建议客户端线程之间不要共用Model,至少要保证共用Model的线程发送消息必须是串行的,但是建议尽量共用Connection。
|
|||
|
/// </summary>
|
|||
|
private static readonly ConcurrentDictionary<string, IModel> _channelDic = new();
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// QueueHelper缓存
|
|||
|
/// </summary>
|
|||
|
private static readonly ConcurrentDictionary<string, QueueHelper<QueueMessage>> _queueHelperDic = new();
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 用于缓存路由键数据
|
|||
|
/// </summary>
|
|||
|
private static readonly ConcurrentDictionary<string, (string exchange, string queue, string routeKey)> _routeDic = new();
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// RabbitMq连接
|
|||
|
/// </summary>
|
|||
|
private static IConnection _connection;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 线程对象,线程锁使用
|
|||
|
/// </summary>
|
|||
|
private static readonly object _locker = new();
|
|||
|
|
|||
|
#endregion 私有静态字段
|
|||
|
|
|||
|
#region 公有属性
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 默认内置死信交换机,默认值:deadletter.default.router
|
|||
|
/// </summary>
|
|||
|
public string DefaultDeadLetterExchange { get; set; } = "deadletter.default.router";
|
|||
|
|
|||
|
#endregion 公有属性
|
|||
|
|
|||
|
#region 构造函数
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 构造函数
|
|||
|
/// </summary>
|
|||
|
/// <param name="config">RabbitMq配置</param>
|
|||
|
public RabbitMqHelper(MqConfig config)
|
|||
|
{
|
|||
|
if (config == null)
|
|||
|
throw new ArgumentNullException(nameof(config));
|
|||
|
|
|||
|
if (_connection == null)
|
|||
|
{
|
|||
|
lock (_locker)
|
|||
|
{
|
|||
|
if (_connection == null)
|
|||
|
_connection = new ConnectionFactory
|
|||
|
{
|
|||
|
//设置主机名
|
|||
|
HostName = config.HostName,
|
|||
|
|
|||
|
//虚拟主机
|
|||
|
VirtualHost = config.VirtualHost,
|
|||
|
|
|||
|
//设置心跳时间
|
|||
|
RequestedHeartbeat = config.RequestedHeartbeat,
|
|||
|
|
|||
|
//设置自动重连
|
|||
|
AutomaticRecoveryEnabled = config.AutomaticRecoveryEnabled,
|
|||
|
|
|||
|
//重连时间
|
|||
|
NetworkRecoveryInterval = config.NetworkRecoveryInterval,
|
|||
|
|
|||
|
//用户名
|
|||
|
UserName = config.UserName,
|
|||
|
|
|||
|
//密码
|
|||
|
Password = config.Password,
|
|||
|
|
|||
|
//配置端口
|
|||
|
Port=config.Port
|
|||
|
}.CreateConnection();
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 构造函数
|
|||
|
/// </summary>
|
|||
|
/// <param name="factory">RabbitMq连接工厂</param>
|
|||
|
public RabbitMqHelper(ConnectionFactory factory)
|
|||
|
{
|
|||
|
if (factory == null)
|
|||
|
throw new ArgumentNullException(nameof(factory));
|
|||
|
|
|||
|
if (_connection == null)
|
|||
|
{
|
|||
|
lock (_locker)
|
|||
|
{
|
|||
|
if (_connection == null)
|
|||
|
_connection = factory.CreateConnection();
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
#endregion 构造函数
|
|||
|
|
|||
|
#region 管道
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 获取管道
|
|||
|
/// </summary>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <returns></returns>
|
|||
|
public IModel GetChannel(string queue = "default")
|
|||
|
{
|
|||
|
IModel channel = null;
|
|||
|
|
|||
|
if (!string.IsNullOrEmpty(queue) && _channelDic.ContainsKey(queue))
|
|||
|
channel = _channelDic[queue];
|
|||
|
|
|||
|
if (!string.IsNullOrEmpty(queue) && channel == null)
|
|||
|
{
|
|||
|
lock (_locker)
|
|||
|
{
|
|||
|
channel = _channelDic.GetOrAdd(queue, queue =>
|
|||
|
{
|
|||
|
var model = _connection.CreateModel();
|
|||
|
_channelDic[queue] = model;
|
|||
|
|
|||
|
return model;
|
|||
|
});
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
channel ??= _connection.CreateModel();
|
|||
|
|
|||
|
return EnsureOpened(channel);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 确保管道是已打开状态
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <returns></returns>
|
|||
|
public IModel EnsureOpened(IModel channel)
|
|||
|
{
|
|||
|
channel ??= GetChannel();
|
|||
|
|
|||
|
if (channel.IsClosed)
|
|||
|
{
|
|||
|
var data = _channelDic.Where(x => x.Value == channel)?.FirstOrDefault();
|
|||
|
if (data != null && data.Value.Key != null)
|
|||
|
{
|
|||
|
//移除已关闭的管道
|
|||
|
_channelDic.TryRemove(data.Value.Key, out var model);
|
|||
|
|
|||
|
//重新获取管道
|
|||
|
channel = GetChannel(data.Value.Key);
|
|||
|
}
|
|||
|
else
|
|||
|
{
|
|||
|
channel = GetChannel();
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
return channel;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 如果交换机或者队列不存在则移除缓存中异常关闭的管道
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel"></param>
|
|||
|
public void RemoveIfNotExist(IModel channel)
|
|||
|
{
|
|||
|
var data = _channelDic.Where(x => x.Value == channel)?.FirstOrDefault();
|
|||
|
if (data != null && data.Value.Key != null)
|
|||
|
{
|
|||
|
//移除已关闭的管道
|
|||
|
_channelDic.TryRemove(data.Value.Key, out var model);
|
|||
|
channel = null;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 获取QueueHelper
|
|||
|
/// </summary>
|
|||
|
/// <param name="queue"></param>
|
|||
|
/// <returns></returns>
|
|||
|
public QueueHelper<QueueMessage> GetQueueHelper(string queue)
|
|||
|
{
|
|||
|
if (!string.IsNullOrEmpty(queue))
|
|||
|
queue = "default";
|
|||
|
|
|||
|
if (_queueHelperDic.ContainsKey(queue))
|
|||
|
return _queueHelperDic[queue];
|
|||
|
|
|||
|
lock (_locker)
|
|||
|
{
|
|||
|
return _queueHelperDic.GetOrAdd(queue, queue =>
|
|||
|
{
|
|||
|
var queueHelper = new QueueHelper<QueueMessage>(x =>
|
|||
|
PublishRabbitMqMessage(
|
|||
|
x.Exchange,
|
|||
|
x.Queue,
|
|||
|
x.RoutingKey,
|
|||
|
x.Body,
|
|||
|
x.ExchangeType,
|
|||
|
x.Durable,
|
|||
|
x.Confirm,
|
|||
|
x.Expiration,
|
|||
|
x.Priority,
|
|||
|
x.QueueArguments,
|
|||
|
x.ExchangeArguments,
|
|||
|
x.Headers));
|
|||
|
|
|||
|
_queueHelperDic[queue] = queueHelper;
|
|||
|
|
|||
|
return queueHelper;
|
|||
|
});
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
#endregion 管道
|
|||
|
|
|||
|
#region 交换机
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 声明交换机,交换机可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="exchangeType">交换机类型:
|
|||
|
/// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
|
|||
|
/// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
|
|||
|
/// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
|
|||
|
/// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
|
|||
|
/// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
|
|||
|
/// 交换机转发消息是最快的。
|
|||
|
/// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
|
|||
|
/// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
|
|||
|
/// 只会匹配到“audit.irs”。</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="autoDelete">自动删除</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
/// <returns>管道</returns>
|
|||
|
public IModel ExchangeDeclare(
|
|||
|
IModel channel,
|
|||
|
string exchange,
|
|||
|
string exchangeType = ExchangeType.Direct,
|
|||
|
bool durable = true,
|
|||
|
bool autoDelete = false,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
channel = EnsureOpened(channel);
|
|||
|
channel.ExchangeDeclare(exchange, exchangeType, durable, autoDelete, arguments);
|
|||
|
|
|||
|
return channel;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 声明交换机,交换机可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="exchangeType">交换机类型:
|
|||
|
/// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
|
|||
|
/// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
|
|||
|
/// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
|
|||
|
/// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
|
|||
|
/// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
|
|||
|
/// 交换机转发消息是最快的。
|
|||
|
/// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
|
|||
|
/// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
|
|||
|
/// 只会匹配到“audit.irs”。</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="autoDelete">自动删除</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
/// <returns>管道</returns>
|
|||
|
public IModel ExchangeDeclareNoWait(
|
|||
|
IModel channel,
|
|||
|
string exchange,
|
|||
|
string exchangeType = ExchangeType.Direct,
|
|||
|
bool durable = true,
|
|||
|
bool autoDelete = false,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
channel = EnsureOpened(channel);
|
|||
|
channel.ExchangeDeclareNoWait(exchange, exchangeType, durable, autoDelete, arguments);
|
|||
|
|
|||
|
return channel;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 被动声明交换机,用于判断交换机是否存在,若存在则无异常,若不存在则抛出异常
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
public void ExchangeDeclarePassive(IModel channel, string exchange)
|
|||
|
{
|
|||
|
EnsureOpened(channel).ExchangeDeclarePassive(exchange);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 判断交换机是否存在
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool IsExchangeExist(IModel channel, string exchange)
|
|||
|
{
|
|||
|
try
|
|||
|
{
|
|||
|
ExchangeDeclarePassive(channel, exchange);
|
|||
|
return true;
|
|||
|
}
|
|||
|
catch
|
|||
|
{
|
|||
|
RemoveIfNotExist(channel);
|
|||
|
return false;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 删除交换机
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="ifUnused">是否没有被使用</param>
|
|||
|
public void ExchangeDelete(
|
|||
|
IModel channel,
|
|||
|
string exchange,
|
|||
|
bool ifUnused = false)
|
|||
|
{
|
|||
|
RouteKeyRemoveByExchange(exchange);
|
|||
|
EnsureOpened(channel).ExchangeDelete(exchange, ifUnused);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 删除交换机
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="ifUnused">是否没有被使用</param>
|
|||
|
public void ExchangeDeleteNoWait(
|
|||
|
IModel channel,
|
|||
|
string exchange,
|
|||
|
bool ifUnused = false)
|
|||
|
{
|
|||
|
RouteKeyRemoveByExchange(exchange);
|
|||
|
EnsureOpened(channel).ExchangeDeleteNoWait(exchange, ifUnused);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 绑定交换机
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="destinationExchange">目标交换机</param>
|
|||
|
/// <param name="sourceExchange">源交换机</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
public void ExchangeBind(
|
|||
|
IModel channel,
|
|||
|
string destinationExchange,
|
|||
|
string sourceExchange,
|
|||
|
string routingKey,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
EnsureOpened(channel).ExchangeBind(destinationExchange, sourceExchange, routingKey, arguments);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 绑定交换机
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="destinationExchange">目标交换机</param>
|
|||
|
/// <param name="sourceExchange">源交换机</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
public void ExchangeBindNoWait(
|
|||
|
IModel channel,
|
|||
|
string destinationExchange,
|
|||
|
string sourceExchange,
|
|||
|
string routingKey,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
EnsureOpened(channel).ExchangeBindNoWait(destinationExchange, sourceExchange, routingKey, arguments);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 解绑交换机
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="destinationExchange">目标交换机</param>
|
|||
|
/// <param name="sourceExchange">源交换机</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
public void ExchangeUnbind(
|
|||
|
IModel channel,
|
|||
|
string destinationExchange,
|
|||
|
string sourceExchange,
|
|||
|
string routingKey,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
EnsureOpened(channel).ExchangeUnbind(destinationExchange, sourceExchange, routingKey, arguments);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 解绑交换机
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="destinationExchange">目标交换机</param>
|
|||
|
/// <param name="sourceExchange">源交换机</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
public void ExchangeUnbindNoWait(
|
|||
|
IModel channel,
|
|||
|
string destinationExchange,
|
|||
|
string sourceExchange,
|
|||
|
string routingKey,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
EnsureOpened(channel).ExchangeUnbindNoWait(destinationExchange, sourceExchange, routingKey, arguments);
|
|||
|
}
|
|||
|
|
|||
|
#endregion 交换机
|
|||
|
|
|||
|
#region 队列
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 声明队列,队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
|
|||
|
/// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
|
|||
|
/// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
|
|||
|
/// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
|
|||
|
/// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>
|
|||
|
/// <param name="autoDelete">自动删除</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
/// <returns>管道</returns>
|
|||
|
public IModel QueueDeclare(
|
|||
|
IModel channel,
|
|||
|
string queue,
|
|||
|
bool durable = true,
|
|||
|
bool exclusive = false,
|
|||
|
bool autoDelete = false,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
channel = EnsureOpened(channel);
|
|||
|
channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
|
|||
|
|
|||
|
return channel;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 声明队列,队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
|
|||
|
/// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
|
|||
|
/// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
|
|||
|
/// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
|
|||
|
/// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>
|
|||
|
/// <param name="autoDelete">自动删除</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
/// <returns>管道</returns>
|
|||
|
public IModel QueueDeclareNoWait(
|
|||
|
IModel channel,
|
|||
|
string queue,
|
|||
|
bool durable = true,
|
|||
|
bool exclusive = false,
|
|||
|
bool autoDelete = false,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
channel = EnsureOpened(channel);
|
|||
|
channel.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
|
|||
|
|
|||
|
return channel;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 被动声明队列,用于判断队列是否存在,若队列存在则无异常,若不存在则抛异常
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
public void QueueDeclarePassive(IModel channel, string queue)
|
|||
|
{
|
|||
|
EnsureOpened(channel).QueueDeclarePassive(queue);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 判断队列是否存在
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool IsQueueExist(IModel channel, string queue)
|
|||
|
{
|
|||
|
try
|
|||
|
{
|
|||
|
QueueDeclarePassive(channel, queue);
|
|||
|
return true;
|
|||
|
}
|
|||
|
catch
|
|||
|
{
|
|||
|
RemoveIfNotExist(channel);
|
|||
|
return false;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 删除队列
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="ifUnused">是否没有被使用</param>
|
|||
|
/// <param name="ifEmpty">是否为空</param>
|
|||
|
/// <returns></returns>
|
|||
|
public uint QueueDelete(
|
|||
|
IModel channel,
|
|||
|
string queue,
|
|||
|
bool ifUnused = false,
|
|||
|
bool ifEmpty = false)
|
|||
|
{
|
|||
|
RouteKeyRemoveByQueue(queue);
|
|||
|
return EnsureOpened(channel).QueueDelete(queue, ifUnused, ifEmpty);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 删除队列
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="ifUnused">是否没有被使用</param>
|
|||
|
/// <param name="ifEmpty">是否为空</param>
|
|||
|
public void QueueDeleteNoWait(
|
|||
|
IModel channel,
|
|||
|
string queue,
|
|||
|
bool ifUnused = false,
|
|||
|
bool ifEmpty = false)
|
|||
|
{
|
|||
|
RouteKeyRemoveByQueue(queue);
|
|||
|
EnsureOpened(channel).QueueDeleteNoWait(queue, ifUnused, ifEmpty);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 绑定队列
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
public void QueueBind(
|
|||
|
IModel channel,
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
if (IsRouteKeyExist(exchange, queue, routingKey))
|
|||
|
return;
|
|||
|
|
|||
|
EnsureOpened(channel).QueueBind(queue, exchange, routingKey, arguments);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 绑定队列
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
public void QueueBindNoWait(
|
|||
|
IModel channel,
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
if (IsRouteKeyExist(exchange, queue, routingKey))
|
|||
|
return;
|
|||
|
|
|||
|
EnsureOpened(channel).QueueBindNoWait(queue, exchange, routingKey, arguments);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 解绑队列
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="arguments">参数</param>
|
|||
|
public void QueueUnbind(
|
|||
|
IModel channel,
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
IDictionary<string, object> arguments = null)
|
|||
|
{
|
|||
|
RouteKeyRemove(exchange, queue, routingKey);
|
|||
|
EnsureOpened(channel).QueueUnbind(queue, exchange, routingKey, arguments);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 清除队列
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
public void QueuePurge(IModel channel, string queue)
|
|||
|
{
|
|||
|
EnsureOpened(channel).QueuePurge(queue);
|
|||
|
}
|
|||
|
|
|||
|
#endregion 队列
|
|||
|
|
|||
|
#region 路由
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 判断路由键是否存在,仅在内存中作判断
|
|||
|
/// </summary>
|
|||
|
/// <param name="exchange">交换机</param>
|
|||
|
/// <param name="queue">队列</param>
|
|||
|
/// <param name="routeKey">路由键</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool IsRouteKeyExist(string exchange, string queue, string routeKey)
|
|||
|
{
|
|||
|
var key = $"{exchange}.{queue}.{routeKey}";
|
|||
|
|
|||
|
if (_routeDic.ContainsKey(key))
|
|||
|
return true;
|
|||
|
else
|
|||
|
{
|
|||
|
lock (_locker)
|
|||
|
{
|
|||
|
if (!_routeDic.ContainsKey(key))
|
|||
|
{
|
|||
|
_routeDic.TryAdd(key, (exchange, queue, routeKey));
|
|||
|
return false;
|
|||
|
}
|
|||
|
return true;
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 移除路由键缓存
|
|||
|
/// </summary>
|
|||
|
/// <param name="exchange"></param>
|
|||
|
/// <param name="queue"></param>
|
|||
|
/// <param name="routeKey"></param>
|
|||
|
public void RouteKeyRemove(string exchange, string queue, string routeKey)
|
|||
|
{
|
|||
|
var key = $"{exchange}.{queue}.{routeKey}";
|
|||
|
if (_routeDic.ContainsKey(key))
|
|||
|
{
|
|||
|
lock (_locker)
|
|||
|
{
|
|||
|
if (_routeDic.ContainsKey(key))
|
|||
|
_routeDic.TryRemove(key, out var _);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 移除路由键缓存
|
|||
|
/// </summary>
|
|||
|
/// <param name="queue">队列</param>
|
|||
|
public void RouteKeyRemoveByQueue(string queue)
|
|||
|
{
|
|||
|
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.queue == queue))
|
|||
|
{
|
|||
|
lock (_locker)
|
|||
|
{
|
|||
|
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.queue == queue))
|
|||
|
{
|
|||
|
var keys = _routeDic.Keys.Where(x => x.Split('.')[1] == queue);
|
|||
|
foreach (var key in keys)
|
|||
|
{
|
|||
|
_routeDic.TryRemove(key, out var _);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 移除路由键缓存
|
|||
|
/// </summary>
|
|||
|
/// <param name="exchange">交换机</param>
|
|||
|
public void RouteKeyRemoveByExchange(string exchange)
|
|||
|
{
|
|||
|
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.exchange == exchange))
|
|||
|
{
|
|||
|
lock (_locker)
|
|||
|
{
|
|||
|
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.exchange == exchange))
|
|||
|
{
|
|||
|
var keys = _routeDic.Keys.Where(x => x.Split('.')[0] == exchange);
|
|||
|
foreach (var key in keys)
|
|||
|
{
|
|||
|
_routeDic.TryRemove(key, out var _);
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
#endregion 路由
|
|||
|
|
|||
|
#region 发布消息
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发布消息
|
|||
|
/// </summary>
|
|||
|
/// <param name="command">消息指令</param>
|
|||
|
/// <param name="confirm">消息发送确认</param>
|
|||
|
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
|||
|
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool Publish<T>(
|
|||
|
T command,
|
|||
|
bool confirm = false,
|
|||
|
string expiration = null,
|
|||
|
byte? priority = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
|||
|
if (attribute == null)
|
|||
|
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
|||
|
|
|||
|
return Publish(attribute, command, confirm, expiration, priority);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发布消息
|
|||
|
/// </summary>
|
|||
|
/// <param name="attribute">RabbitMq特性配置</param>
|
|||
|
/// <param name="command">消息指令</param>
|
|||
|
/// <param name="confirm">消息发送确认</param>
|
|||
|
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
|||
|
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool Publish<T>(
|
|||
|
RabbitMqAttribute attribute,
|
|||
|
T command,
|
|||
|
bool confirm = false,
|
|||
|
string expiration = null,
|
|||
|
byte? priority = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
//消息内容
|
|||
|
var body = command.ToJson();
|
|||
|
|
|||
|
//自定义队列参数
|
|||
|
var arguments = new Dictionary<string, object>();
|
|||
|
|
|||
|
//设置队列消息过期时间,指整个队列的所有消息
|
|||
|
if (attribute.MessageTTL > 0)
|
|||
|
arguments["x-message-ttl"] = attribute.MessageTTL;
|
|||
|
|
|||
|
//设置队列过期时间
|
|||
|
if (attribute.AutoExpire > 0)
|
|||
|
arguments["x-expires"] = attribute.AutoExpire;
|
|||
|
|
|||
|
//设置队列最大长度
|
|||
|
if (attribute.MaxLength > 0)
|
|||
|
arguments["x-max-length"] = attribute.MaxLength;
|
|||
|
|
|||
|
//设置队列占用最大空间
|
|||
|
if (attribute.MaxLengthBytes > 0)
|
|||
|
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
|
|||
|
|
|||
|
//设置队列溢出行为
|
|||
|
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
|
|||
|
arguments["x-overflow"] = attribute.OverflowBehaviour;
|
|||
|
|
|||
|
//设置死信交换机
|
|||
|
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
|
|||
|
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
|
|||
|
|
|||
|
//设置死信路由键
|
|||
|
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
|
|||
|
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
|
|||
|
|
|||
|
//设置队列优先级
|
|||
|
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
|
|||
|
arguments["x-max-priority"] = attribute.MaximumPriority;
|
|||
|
|
|||
|
//设置队列惰性模式
|
|||
|
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
|
|||
|
arguments["x-queue-mode"] = attribute.LazyMode;
|
|||
|
|
|||
|
//设置集群配置
|
|||
|
if (!attribute.MasterLocator.IsNullOrEmpty())
|
|||
|
arguments["x-queue-master-locator"] = attribute.MasterLocator;
|
|||
|
|
|||
|
//发送消息
|
|||
|
return Publish(
|
|||
|
attribute.Exchange,
|
|||
|
attribute.Queue,
|
|||
|
attribute.RoutingKey,
|
|||
|
body,
|
|||
|
attribute.ExchangeType,
|
|||
|
attribute.Durable,
|
|||
|
confirm,
|
|||
|
expiration,
|
|||
|
priority,
|
|||
|
arguments,
|
|||
|
null,
|
|||
|
attribute.Header?.ToObject<Dictionary<string, object>>());
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发布消息
|
|||
|
/// </summary>
|
|||
|
/// <param name="command">消息指令</param>
|
|||
|
/// <param name="confirm">消息发送确认</param>
|
|||
|
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
|||
|
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool Publish<T>(
|
|||
|
IEnumerable<T> command,
|
|||
|
bool confirm = false,
|
|||
|
string expiration = null,
|
|||
|
byte? priority = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
|||
|
if (attribute == null)
|
|||
|
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
|||
|
|
|||
|
return Publish(attribute, command, confirm, expiration, priority);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发布消息
|
|||
|
/// </summary>
|
|||
|
/// <param name="attribute">RabbitMq特性配置</param>
|
|||
|
/// <param name="command">消息指令</param>
|
|||
|
/// <param name="confirm">消息发送确认</param>
|
|||
|
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
|||
|
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool Publish<T>(
|
|||
|
RabbitMqAttribute attribute,
|
|||
|
IEnumerable<T> command,
|
|||
|
bool confirm = false,
|
|||
|
string expiration = null,
|
|||
|
byte? priority = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
//消息内容
|
|||
|
var body = command.Select(x => x.ToJson());
|
|||
|
|
|||
|
//自定义队列参数
|
|||
|
var arguments = new Dictionary<string, object>();
|
|||
|
|
|||
|
//设置队列消息过期时间,指整个队列的所有消息
|
|||
|
if (attribute.MessageTTL > 0)
|
|||
|
arguments["x-message-ttl"] = attribute.MessageTTL;
|
|||
|
|
|||
|
//设置队列过期时间
|
|||
|
if (attribute.AutoExpire > 0)
|
|||
|
arguments["x-expires"] = attribute.AutoExpire;
|
|||
|
|
|||
|
//设置队列最大长度
|
|||
|
if (attribute.MaxLength > 0)
|
|||
|
arguments["x-max-length"] = attribute.MaxLength;
|
|||
|
|
|||
|
//设置队列占用最大空间
|
|||
|
if (attribute.MaxLengthBytes > 0)
|
|||
|
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
|
|||
|
|
|||
|
//设置队列溢出行为
|
|||
|
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
|
|||
|
arguments["x-overflow"] = attribute.OverflowBehaviour;
|
|||
|
|
|||
|
//设置死信交换机
|
|||
|
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
|
|||
|
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
|
|||
|
|
|||
|
//设置死信路由键
|
|||
|
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
|
|||
|
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
|
|||
|
|
|||
|
//设置队列优先级
|
|||
|
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
|
|||
|
arguments["x-max-priority"] = attribute.MaximumPriority;
|
|||
|
|
|||
|
//设置队列惰性模式
|
|||
|
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
|
|||
|
arguments["x-queue-mode"] = attribute.LazyMode;
|
|||
|
|
|||
|
//设置集群配置
|
|||
|
if (!attribute.MasterLocator.IsNullOrEmpty())
|
|||
|
arguments["x-queue-master-locator"] = attribute.MasterLocator;
|
|||
|
|
|||
|
//发送消息
|
|||
|
return Publish(
|
|||
|
attribute.Exchange,
|
|||
|
attribute.Queue,
|
|||
|
attribute.RoutingKey,
|
|||
|
body,
|
|||
|
attribute.ExchangeType,
|
|||
|
attribute.Durable,
|
|||
|
confirm,
|
|||
|
expiration,
|
|||
|
priority,
|
|||
|
arguments,
|
|||
|
null,
|
|||
|
attribute.Header?.ToObject<Dictionary<string, object>>());
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发布消息
|
|||
|
/// </summary>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="body">消息内容</param>
|
|||
|
/// <param name="exchangeType">交换机类型</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="confirm">消息发送确认</param>
|
|||
|
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
|||
|
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
|||
|
/// <param name="queueArguments">队列参数</param>
|
|||
|
/// <param name="exchangeArguments">交换机参数</param>
|
|||
|
/// <param name="headers">消息头部</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool Publish(
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
string body,
|
|||
|
string exchangeType = ExchangeType.Direct,
|
|||
|
bool durable = true,
|
|||
|
bool confirm = false,
|
|||
|
string expiration = null,
|
|||
|
byte? priority = null,
|
|||
|
IDictionary<string, object> queueArguments = null,
|
|||
|
IDictionary<string, object> exchangeArguments = null,
|
|||
|
IDictionary<string, object> headers = null)
|
|||
|
{
|
|||
|
return Publish(
|
|||
|
exchange,
|
|||
|
queue,
|
|||
|
routingKey,
|
|||
|
new[] { body },
|
|||
|
exchangeType,
|
|||
|
durable,
|
|||
|
confirm,
|
|||
|
expiration,
|
|||
|
priority,
|
|||
|
queueArguments,
|
|||
|
exchangeArguments,
|
|||
|
headers);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发布消息
|
|||
|
/// </summary>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="body">消息内容</param>
|
|||
|
/// <param name="exchangeType">交换机类型</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="confirm">消息发送确认</param>
|
|||
|
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
|||
|
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
|||
|
/// <param name="queueArguments">队列参数</param>
|
|||
|
/// <param name="exchangeArguments">交换机参数</param>
|
|||
|
/// <param name="headers">消息头部</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool Publish(
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
IEnumerable<string> body,
|
|||
|
string exchangeType = ExchangeType.Direct,
|
|||
|
bool durable = true,
|
|||
|
bool confirm = false,
|
|||
|
string expiration = null,
|
|||
|
byte? priority = null,
|
|||
|
IDictionary<string, object> queueArguments = null,
|
|||
|
IDictionary<string, object> exchangeArguments = null,
|
|||
|
IDictionary<string, object> headers = null)
|
|||
|
{
|
|||
|
return GetQueueHelper(queue).Enqueue(
|
|||
|
new QueueMessage
|
|||
|
{
|
|||
|
Exchange = exchange,
|
|||
|
Queue = queue,
|
|||
|
RoutingKey = routingKey,
|
|||
|
Body = body,
|
|||
|
ExchangeType = exchangeType,
|
|||
|
Durable = durable,
|
|||
|
Confirm = confirm,
|
|||
|
Expiration = expiration,
|
|||
|
Priority = priority,
|
|||
|
QueueArguments = queueArguments,
|
|||
|
ExchangeArguments = exchangeArguments,
|
|||
|
Headers = headers
|
|||
|
});
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发布消息到RabbitMq,注意此方法使用IModel线程不安全,建议使用Publish方法,若要使用需要保证单线程调用
|
|||
|
/// </summary>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="body">消息内容</param>
|
|||
|
/// <param name="exchangeType">交换机类型</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="confirm">消息发送确认</param>
|
|||
|
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
|||
|
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
|||
|
/// <param name="queueArguments">队列参数</param>
|
|||
|
/// <param name="exchangeArguments">交换机参数</param>
|
|||
|
/// <param name="headers">消息头部</param>
|
|||
|
/// <returns></returns>
|
|||
|
public bool PublishRabbitMqMessage(
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
IEnumerable<string> body,
|
|||
|
string exchangeType = ExchangeType.Direct,
|
|||
|
bool durable = true,
|
|||
|
bool confirm = false,
|
|||
|
string expiration = null,
|
|||
|
byte? priority = null,
|
|||
|
IDictionary<string, object> queueArguments = null,
|
|||
|
IDictionary<string, object> exchangeArguments = null,
|
|||
|
IDictionary<string, object> headers = null)
|
|||
|
{
|
|||
|
//获取管道
|
|||
|
var channel = GetChannel(queue);
|
|||
|
|
|||
|
//判断交换机是否存在
|
|||
|
if (!IsExchangeExist(channel, exchange))
|
|||
|
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
|
|||
|
|
|||
|
//判断队列是否存在
|
|||
|
if (!IsQueueExist(channel, queue))
|
|||
|
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
|
|||
|
|
|||
|
//绑定交换机和队列
|
|||
|
QueueBind(channel, exchange, queue, routingKey);
|
|||
|
|
|||
|
//声明消息属性
|
|||
|
var props = channel.CreateBasicProperties();
|
|||
|
|
|||
|
//持久化
|
|||
|
props.Persistent = durable;
|
|||
|
|
|||
|
//单个消息过期时间
|
|||
|
if (!expiration.IsNullOrEmpty())
|
|||
|
props.Expiration = expiration;
|
|||
|
|
|||
|
//单个消息优先级
|
|||
|
if (priority >= 0 && priority <= 9)
|
|||
|
props.Priority = priority.Value;
|
|||
|
|
|||
|
//消息头部
|
|||
|
if (headers != null)
|
|||
|
props.Headers = headers;
|
|||
|
|
|||
|
//是否启用消息发送确认机制
|
|||
|
if (confirm)
|
|||
|
channel.ConfirmSelect();
|
|||
|
|
|||
|
//发送消息
|
|||
|
foreach (var item in body)
|
|||
|
{
|
|||
|
var text = string.IsNullOrEmpty(item) ? null : Encoding.UTF8.GetBytes(item);
|
|||
|
channel.BasicPublish(exchange, routingKey, props, text);
|
|||
|
}
|
|||
|
|
|||
|
//消息发送失败处理
|
|||
|
if (confirm && !channel.WaitForConfirms())
|
|||
|
return false;
|
|||
|
|
|||
|
return true;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 发布消息到内置死信队列
|
|||
|
/// </summary>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="body">消息内容</param>
|
|||
|
/// <param name="retryCount">重试次数</param>
|
|||
|
/// <param name="exception">异常</param>
|
|||
|
/// <returns></returns>
|
|||
|
private bool PublishToDead(
|
|||
|
string queue,
|
|||
|
string exchange,
|
|||
|
string routingKey,
|
|||
|
string body,
|
|||
|
int retryCount,
|
|||
|
Exception exception)
|
|||
|
{
|
|||
|
//内置死信队列、交换机、路由键
|
|||
|
var deadLetterQueue = $"{queue.ToLower()}.{(exception != null ? "error" : "fail")}";
|
|||
|
var deadLetterExchange = DefaultDeadLetterExchange;
|
|||
|
var deadLetterRoutingKey = $"{queue.ToLower()}.deadletter";
|
|||
|
|
|||
|
//内置死信队列内容
|
|||
|
var deadLetterBody = new DeadLetterQueue
|
|||
|
{
|
|||
|
Body = body,
|
|||
|
CreateDateTime = DateTime.Now,
|
|||
|
Exception = exception,
|
|||
|
ExceptionMsg = exception?.Message,
|
|||
|
Queue = queue,
|
|||
|
RoutingKey = routingKey,
|
|||
|
Exchange = exchange,
|
|||
|
RetryCount = retryCount
|
|||
|
};
|
|||
|
|
|||
|
//发送内置死信消息,注意此处与原生的死信消息无关
|
|||
|
return Publish(deadLetterExchange, deadLetterQueue, deadLetterRoutingKey, deadLetterBody.ToJson());
|
|||
|
}
|
|||
|
|
|||
|
#endregion 发布消息
|
|||
|
|
|||
|
#region 订阅消息
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 订阅消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="subscriber">消费处理委托</param>
|
|||
|
/// <param name="handler">异常处理委托</param>
|
|||
|
/// <param name="registered">注册事件</param>
|
|||
|
/// <param name="unregistered">取消注册事件</param>
|
|||
|
/// <param name="shutdown">关闭事件</param>
|
|||
|
public void Subscribe<T>(
|
|||
|
Func<T, BasicDeliverEventArgs, bool> subscriber,
|
|||
|
Action<string, int, Exception> handler,
|
|||
|
EventHandler<ConsumerEventArgs> registered = null,
|
|||
|
EventHandler<ConsumerEventArgs> unregistered = null,
|
|||
|
EventHandler<ShutdownEventArgs> shutdown = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
|||
|
if (attribute == null)
|
|||
|
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
|||
|
|
|||
|
Subscribe(attribute, subscriber, handler, registered, unregistered, shutdown);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 订阅消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="attribute">RabbitMq特性配置</param>
|
|||
|
/// <param name="subscriber">消费处理委托</param>
|
|||
|
/// <param name="handler">异常处理委托</param>
|
|||
|
/// <param name="registered">注册事件</param>
|
|||
|
/// <param name="unregistered">取消注册事件</param>
|
|||
|
/// <param name="shutdown">关闭事件</param>
|
|||
|
public void Subscribe<T>(
|
|||
|
RabbitMqAttribute attribute,
|
|||
|
Func<T, BasicDeliverEventArgs, bool> subscriber,
|
|||
|
Action<string, int, Exception> handler,
|
|||
|
EventHandler<ConsumerEventArgs> registered = null,
|
|||
|
EventHandler<ConsumerEventArgs> unregistered = null,
|
|||
|
EventHandler<ShutdownEventArgs> shutdown = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
//自定义参数
|
|||
|
var arguments = new Dictionary<string, object>();
|
|||
|
|
|||
|
//设置队列消息过期时间,指整个队列的所有消息
|
|||
|
if (attribute.MessageTTL > 0)
|
|||
|
arguments["x-message-ttl"] = attribute.MessageTTL;
|
|||
|
|
|||
|
//设置队列过期时间
|
|||
|
if (attribute.AutoExpire > 0)
|
|||
|
arguments["x-expires"] = attribute.AutoExpire;
|
|||
|
|
|||
|
//设置队列最大长度
|
|||
|
if (attribute.MaxLength > 0)
|
|||
|
arguments["x-max-length"] = attribute.MaxLength;
|
|||
|
|
|||
|
//设置队列占用最大空间
|
|||
|
if (attribute.MaxLengthBytes > 0)
|
|||
|
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
|
|||
|
|
|||
|
//设置队列溢出行为
|
|||
|
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
|
|||
|
arguments["x-overflow"] = attribute.OverflowBehaviour;
|
|||
|
|
|||
|
//设置死信交换机
|
|||
|
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
|
|||
|
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
|
|||
|
|
|||
|
//设置死信路由键
|
|||
|
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
|
|||
|
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
|
|||
|
|
|||
|
//设置队列优先级
|
|||
|
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
|
|||
|
arguments["x-max-priority"] = attribute.MaximumPriority;
|
|||
|
|
|||
|
//设置队列惰性模式
|
|||
|
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
|
|||
|
arguments["x-queue-mode"] = attribute.LazyMode;
|
|||
|
|
|||
|
//设置集群配置
|
|||
|
if (!attribute.MasterLocator.IsNullOrEmpty())
|
|||
|
arguments["x-queue-master-locator"] = attribute.MasterLocator;
|
|||
|
|
|||
|
//订阅消息
|
|||
|
Subscribe(
|
|||
|
attribute.Exchange,
|
|||
|
attribute.Queue,
|
|||
|
attribute.RoutingKey,
|
|||
|
subscriber,
|
|||
|
handler,
|
|||
|
attribute.RetryCount,
|
|||
|
attribute.PrefetchCount,
|
|||
|
attribute.DeadLetter,
|
|||
|
attribute.ExchangeType,
|
|||
|
attribute.Durable,
|
|||
|
attribute.ConsumerCount,
|
|||
|
arguments,
|
|||
|
registered: registered,
|
|||
|
unregistered: unregistered,
|
|||
|
shutdown: shutdown);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 订阅消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="subscriber">消费处理委托</param>
|
|||
|
/// <param name="handler">异常处理委托</param>
|
|||
|
/// <param name="registered">注册事件</param>
|
|||
|
/// <param name="unregistered">取消注册事件</param>
|
|||
|
/// <param name="shutdown">关闭事件</param>
|
|||
|
public void Subscribe<T>(
|
|||
|
Func<T, BasicDeliverEventArgs, Task<bool>> subscriber,
|
|||
|
Func<string, int, Exception, Task> handler,
|
|||
|
EventHandler<ConsumerEventArgs> registered = null,
|
|||
|
EventHandler<ConsumerEventArgs> unregistered = null,
|
|||
|
EventHandler<ShutdownEventArgs> shutdown = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
|||
|
if (attribute == null)
|
|||
|
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
|||
|
|
|||
|
Subscribe(attribute, subscriber, handler, registered, unregistered, shutdown);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 订阅消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="attribute">RabbitMq特性配置</param>
|
|||
|
/// <param name="subscriber">消费处理委托</param>
|
|||
|
/// <param name="handler">异常处理委托</param>
|
|||
|
/// <param name="registered">注册事件</param>
|
|||
|
/// <param name="unregistered">取消注册事件</param>
|
|||
|
/// <param name="shutdown">关闭事件</param>
|
|||
|
public void Subscribe<T>(
|
|||
|
RabbitMqAttribute attribute,
|
|||
|
Func<T, BasicDeliverEventArgs, Task<bool>> subscriber,
|
|||
|
Func<string, int, Exception, Task> handler,
|
|||
|
EventHandler<ConsumerEventArgs> registered = null,
|
|||
|
EventHandler<ConsumerEventArgs> unregistered = null,
|
|||
|
EventHandler<ShutdownEventArgs> shutdown = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
//自定义参数
|
|||
|
var arguments = new Dictionary<string, object>();
|
|||
|
|
|||
|
//设置队列消息过期时间,指整个队列的所有消息
|
|||
|
if (attribute.MessageTTL > 0)
|
|||
|
arguments["x-message-ttl"] = attribute.MessageTTL;
|
|||
|
|
|||
|
//设置队列过期时间
|
|||
|
if (attribute.AutoExpire > 0)
|
|||
|
arguments["x-expires"] = attribute.AutoExpire;
|
|||
|
|
|||
|
//设置队列最大长度
|
|||
|
if (attribute.MaxLength > 0)
|
|||
|
arguments["x-max-length"] = attribute.MaxLength;
|
|||
|
|
|||
|
//设置队列占用最大空间
|
|||
|
if (attribute.MaxLengthBytes > 0)
|
|||
|
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
|
|||
|
|
|||
|
//设置队列溢出行为
|
|||
|
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
|
|||
|
arguments["x-overflow"] = attribute.OverflowBehaviour;
|
|||
|
|
|||
|
//设置死信交换机
|
|||
|
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
|
|||
|
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
|
|||
|
|
|||
|
//设置死信路由键
|
|||
|
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
|
|||
|
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
|
|||
|
|
|||
|
//设置队列优先级
|
|||
|
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
|
|||
|
arguments["x-max-priority"] = attribute.MaximumPriority;
|
|||
|
|
|||
|
//设置队列惰性模式
|
|||
|
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
|
|||
|
arguments["x-queue-mode"] = attribute.LazyMode;
|
|||
|
|
|||
|
//设置集群配置
|
|||
|
if (!attribute.MasterLocator.IsNullOrEmpty())
|
|||
|
arguments["x-queue-master-locator"] = attribute.MasterLocator;
|
|||
|
|
|||
|
//订阅消息
|
|||
|
Subscribe(
|
|||
|
attribute.Exchange,
|
|||
|
attribute.Queue,
|
|||
|
attribute.RoutingKey,
|
|||
|
subscriber,
|
|||
|
handler,
|
|||
|
attribute.RetryCount,
|
|||
|
attribute.PrefetchCount,
|
|||
|
attribute.DeadLetter,
|
|||
|
attribute.ExchangeType,
|
|||
|
attribute.Durable,
|
|||
|
attribute.ConsumerCount,
|
|||
|
arguments,
|
|||
|
registered: registered,
|
|||
|
unregistered: unregistered,
|
|||
|
shutdown: shutdown);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 订阅消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="subscriber">消费处理委托</param>
|
|||
|
/// <param name="handler">异常处理委托</param>
|
|||
|
/// <param name="retryCount">重试次数</param>
|
|||
|
/// <param name="prefetchCount">预取数量</param>
|
|||
|
/// <param name="deadLetter">是否进入内置死信队列,此处与原生死信无关</param>
|
|||
|
/// <param name="exchangeType">交换机类型</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="consumerCount">消费者数量</param>
|
|||
|
/// <param name="queueArguments">队列参数</param>
|
|||
|
/// <param name="exchangeArguments">交换机参数</param>
|
|||
|
/// <param name="registered">注册事件</param>
|
|||
|
/// <param name="unregistered">取消注册事件</param>
|
|||
|
/// <param name="shutdown">关闭事件</param>
|
|||
|
public void Subscribe<T>(
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
Func<T, BasicDeliverEventArgs, bool> subscriber,
|
|||
|
Action<string, int, Exception> handler,
|
|||
|
int retryCount = 5,
|
|||
|
ushort prefetchCount = 1,
|
|||
|
bool deadLetter = true,
|
|||
|
string exchangeType = ExchangeType.Direct,
|
|||
|
bool durable = true,
|
|||
|
int consumerCount = 1,
|
|||
|
IDictionary<string, object> queueArguments = null,
|
|||
|
IDictionary<string, object> exchangeArguments = null,
|
|||
|
EventHandler<ConsumerEventArgs> registered = null,
|
|||
|
EventHandler<ConsumerEventArgs> unregistered = null,
|
|||
|
EventHandler<ShutdownEventArgs> shutdown = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
//获取管道
|
|||
|
var channel = GetChannel(queue);
|
|||
|
|
|||
|
//判断交换机是否存在
|
|||
|
if (!IsExchangeExist(channel, exchange))
|
|||
|
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
|
|||
|
|
|||
|
//判断队列是否存在
|
|||
|
if (!IsQueueExist(channel, queue))
|
|||
|
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
|
|||
|
|
|||
|
//绑定交换机和队列
|
|||
|
QueueBind(channel, exchange, queue, routingKey);
|
|||
|
|
|||
|
//设置每次预取数量
|
|||
|
channel.BasicQos(0, prefetchCount, false);
|
|||
|
|
|||
|
//根据设置的消费者数量创建消费者
|
|||
|
for (int i = 0; i < consumerCount; i++)
|
|||
|
{
|
|||
|
//创建消费者
|
|||
|
var consumer = new EventingBasicConsumer(channel);
|
|||
|
|
|||
|
//接收消息事件
|
|||
|
consumer.Received += (sender, ea) =>
|
|||
|
{
|
|||
|
var text = Encoding.UTF8.GetString(ea.Body.ToArray());
|
|||
|
var body = text;
|
|||
|
var numberOfRetries = 0;
|
|||
|
Exception exception = null;
|
|||
|
bool? result = false;
|
|||
|
|
|||
|
while (numberOfRetries <= retryCount)
|
|||
|
{
|
|||
|
try
|
|||
|
{
|
|||
|
var msg = body.ToObject<T>();
|
|||
|
|
|||
|
result = subscriber?.Invoke(msg, ea);
|
|||
|
|
|||
|
if (result == true)
|
|||
|
channel.BasicAck(ea.DeliveryTag, false);
|
|||
|
else
|
|||
|
channel.BasicNack(ea.DeliveryTag, false, false);
|
|||
|
|
|||
|
//异常置空
|
|||
|
exception = null;
|
|||
|
break;
|
|||
|
}
|
|||
|
catch (Exception ex)
|
|||
|
{
|
|||
|
exception = ex;
|
|||
|
handler?.Invoke(body, numberOfRetries, ex);
|
|||
|
numberOfRetries++;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
//重试后异常仍未解决
|
|||
|
if (exception != null)
|
|||
|
channel.BasicNack(ea.DeliveryTag, false, false);
|
|||
|
|
|||
|
//是否进入内置死信队列
|
|||
|
if (deadLetter && (!(result == true) || exception != null))
|
|||
|
PublishToDead(
|
|||
|
queue,
|
|||
|
ea.Exchange,
|
|||
|
ea.RoutingKey,
|
|||
|
body,
|
|||
|
exception == null ? numberOfRetries : numberOfRetries - 1,
|
|||
|
exception);
|
|||
|
};
|
|||
|
|
|||
|
//注册事件
|
|||
|
if (registered != null)
|
|||
|
consumer.Registered += registered;
|
|||
|
|
|||
|
//取消注册事件
|
|||
|
if (unregistered != null)
|
|||
|
consumer.Unregistered += unregistered;
|
|||
|
|
|||
|
//关闭事件
|
|||
|
if (shutdown != null)
|
|||
|
consumer.Shutdown += shutdown;
|
|||
|
|
|||
|
//手动确认
|
|||
|
channel.BasicConsume(queue, false, consumer);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 订阅消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="subscriber">消费处理委托</param>
|
|||
|
/// <param name="handler">异常处理委托</param>
|
|||
|
/// <param name="retryCount">重试次数</param>
|
|||
|
/// <param name="prefetchCount">预取数量</param>
|
|||
|
/// <param name="deadLetter">是否进入内置死信队列,此处与原生死信无关</param>
|
|||
|
/// <param name="exchangeType">交换机类型</param>
|
|||
|
/// <param name="durable">持久化</param>
|
|||
|
/// <param name="consumerCount">消费者数量</param>
|
|||
|
/// <param name="queueArguments">队列参数</param>
|
|||
|
/// <param name="exchangeArguments">交换机参数</param>
|
|||
|
/// <param name="registered">注册事件</param>
|
|||
|
/// <param name="unregistered">取消注册事件</param>
|
|||
|
/// <param name="shutdown">关闭事件</param>
|
|||
|
public void Subscribe<T>(
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
Func<T, BasicDeliverEventArgs, Task<bool>> subscriber,
|
|||
|
Func<string, int, Exception, Task> handler,
|
|||
|
int retryCount = 5,
|
|||
|
ushort prefetchCount = 1,
|
|||
|
bool deadLetter = true,
|
|||
|
string exchangeType = ExchangeType.Direct,
|
|||
|
bool durable = true,
|
|||
|
int consumerCount = 1,
|
|||
|
IDictionary<string, object> queueArguments = null,
|
|||
|
IDictionary<string, object> exchangeArguments = null,
|
|||
|
EventHandler<ConsumerEventArgs> registered = null,
|
|||
|
EventHandler<ConsumerEventArgs> unregistered = null,
|
|||
|
EventHandler<ShutdownEventArgs> shutdown = null)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
//获取管道
|
|||
|
var channel = GetChannel(queue);
|
|||
|
|
|||
|
//判断交换机是否存在
|
|||
|
if (!IsExchangeExist(channel, exchange))
|
|||
|
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
|
|||
|
|
|||
|
//判断队列是否存在
|
|||
|
if (!IsQueueExist(channel, queue))
|
|||
|
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
|
|||
|
|
|||
|
//绑定交换机和队列
|
|||
|
QueueBind(channel, exchange, queue, routingKey);
|
|||
|
|
|||
|
//设置每次预取数量
|
|||
|
channel.BasicQos(0, prefetchCount, false);
|
|||
|
|
|||
|
//根据设置的消费者数量创建消费者
|
|||
|
for (int i = 0; i < consumerCount; i++)
|
|||
|
{
|
|||
|
//创建消费者
|
|||
|
var consumer = new EventingBasicConsumer(channel);
|
|||
|
|
|||
|
//接收消息事件
|
|||
|
consumer.Received += async (sender, ea) =>
|
|||
|
{
|
|||
|
var text = Encoding.UTF8.GetString(ea.Body.ToArray());
|
|||
|
var body = text;
|
|||
|
var numberOfRetries = 0;
|
|||
|
Exception exception = null;
|
|||
|
bool? result = false;
|
|||
|
|
|||
|
while (numberOfRetries <= retryCount)
|
|||
|
{
|
|||
|
try
|
|||
|
{
|
|||
|
var msg = body.ToObject<T>();
|
|||
|
|
|||
|
if (subscriber != null)
|
|||
|
result = await subscriber(msg, ea);
|
|||
|
|
|||
|
if (result == true)
|
|||
|
channel.BasicAck(ea.DeliveryTag, false);
|
|||
|
else
|
|||
|
channel.BasicNack(ea.DeliveryTag, false, false);
|
|||
|
|
|||
|
//异常置空
|
|||
|
exception = null;
|
|||
|
break;
|
|||
|
}
|
|||
|
catch (Exception ex)
|
|||
|
{
|
|||
|
exception = ex;
|
|||
|
|
|||
|
if (handler != null)
|
|||
|
await handler(body, numberOfRetries, ex);
|
|||
|
|
|||
|
numberOfRetries++;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
//重试后异常仍未解决
|
|||
|
if (exception != null)
|
|||
|
channel.BasicNack(ea.DeliveryTag, false, false);
|
|||
|
|
|||
|
//是否进入内置死信队列
|
|||
|
if (deadLetter && (!(result == true) || exception != null))
|
|||
|
PublishToDead(
|
|||
|
queue,
|
|||
|
ea.Exchange,
|
|||
|
ea.RoutingKey,
|
|||
|
body,
|
|||
|
exception == null ? numberOfRetries : numberOfRetries - 1,
|
|||
|
exception);
|
|||
|
};
|
|||
|
|
|||
|
//注册事件
|
|||
|
if (registered != null)
|
|||
|
consumer.Registered += registered;
|
|||
|
|
|||
|
//取消注册事件
|
|||
|
if (unregistered != null)
|
|||
|
consumer.Unregistered += unregistered;
|
|||
|
|
|||
|
//关闭事件
|
|||
|
if (shutdown != null)
|
|||
|
consumer.Shutdown += shutdown;
|
|||
|
|
|||
|
//手动确认
|
|||
|
channel.BasicConsume(queue, false, consumer);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
#endregion 订阅消息
|
|||
|
|
|||
|
#region 获取消息
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 获取消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="handler">消费处理委托</param>
|
|||
|
public void Pull<T>(
|
|||
|
Action<T, BasicGetResult> handler)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
|||
|
if (attribute == null)
|
|||
|
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
|||
|
|
|||
|
Pull(attribute.Exchange, attribute.Queue, attribute.RoutingKey, handler);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 获取消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="handler">消费处理委托</param>
|
|||
|
public void Pull<T>(
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
Action<T, BasicGetResult> handler)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
//获取管道
|
|||
|
var channel = GetChannel(queue);
|
|||
|
|
|||
|
//判断交换机是否存在
|
|||
|
if (!IsExchangeExist(channel, exchange))
|
|||
|
channel = ExchangeDeclare(channel, exchange);
|
|||
|
|
|||
|
//判断队列是否存在
|
|||
|
if (!IsQueueExist(channel, queue))
|
|||
|
channel = QueueDeclare(channel, queue);
|
|||
|
|
|||
|
//绑定交换机和队列
|
|||
|
QueueBind(channel, exchange, queue, routingKey);
|
|||
|
|
|||
|
var result = channel.BasicGet(queue, false);
|
|||
|
if (result == null)
|
|||
|
return;
|
|||
|
var text = Encoding.UTF8.GetString(result.Body.ToArray());
|
|||
|
var msg = text.ToObject<T>();
|
|||
|
try
|
|||
|
{
|
|||
|
handler(msg, result);
|
|||
|
}
|
|||
|
catch (Exception)
|
|||
|
{
|
|||
|
throw;
|
|||
|
}
|
|||
|
finally
|
|||
|
{
|
|||
|
channel.BasicAck(result.DeliveryTag, false);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 获取消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="handler">消费处理委托</param>
|
|||
|
public async Task PullAsync<T>(
|
|||
|
Func<T, BasicGetResult, Task> handler)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
|||
|
if (attribute == null)
|
|||
|
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
|||
|
|
|||
|
await PullAsync(attribute.Exchange, attribute.Queue, attribute.RoutingKey, handler);
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 获取消息
|
|||
|
/// </summary>
|
|||
|
/// <typeparam name="T"></typeparam>
|
|||
|
/// <param name="exchange">交换机名称</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <param name="routingKey">路由键</param>
|
|||
|
/// <param name="handler">消费处理委托</param>
|
|||
|
public async Task PullAsync<T>(
|
|||
|
string exchange,
|
|||
|
string queue,
|
|||
|
string routingKey,
|
|||
|
Func<T, BasicGetResult, Task> handler)
|
|||
|
where T : class
|
|||
|
{
|
|||
|
//获取管道
|
|||
|
var channel = GetChannel(queue);
|
|||
|
|
|||
|
//判断交换机是否存在
|
|||
|
if (!IsExchangeExist(channel, exchange))
|
|||
|
channel = ExchangeDeclare(channel, exchange);
|
|||
|
|
|||
|
//判断队列是否存在
|
|||
|
if (!IsQueueExist(channel, queue))
|
|||
|
channel = QueueDeclare(channel, queue);
|
|||
|
|
|||
|
//绑定交换机和队列
|
|||
|
QueueBind(channel, exchange, queue, routingKey);
|
|||
|
|
|||
|
var result = channel.BasicGet(queue, false);
|
|||
|
if (result == null)
|
|||
|
return;
|
|||
|
var text = Encoding.UTF8.GetString(result.Body.ToArray());
|
|||
|
var msg = text.ToObject<T>();
|
|||
|
try
|
|||
|
{
|
|||
|
if (handler != null)
|
|||
|
await handler(msg, result);
|
|||
|
}
|
|||
|
catch (Exception)
|
|||
|
{
|
|||
|
throw;
|
|||
|
}
|
|||
|
finally
|
|||
|
{
|
|||
|
channel.BasicAck(result.DeliveryTag, false);
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 获取消息数量
|
|||
|
/// </summary>
|
|||
|
/// <param name="channel">管道</param>
|
|||
|
/// <param name="queue">队列名称</param>
|
|||
|
/// <returns></returns>
|
|||
|
public uint GetMessageCount(
|
|||
|
IModel channel,
|
|||
|
string queue)
|
|||
|
{
|
|||
|
return (channel ?? GetChannel(queue)).MessageCount(queue);
|
|||
|
}
|
|||
|
|
|||
|
#endregion 获取消息
|
|||
|
|
|||
|
#region 释放资源
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 执行与释放或重置非托管资源关联的应用程序定义的任务。
|
|||
|
/// </summary>
|
|||
|
public void Dispose()
|
|||
|
{
|
|||
|
foreach (var item in _channelDic)
|
|||
|
{
|
|||
|
item.Value?.Dispose();
|
|||
|
}
|
|||
|
|
|||
|
foreach (var item in _queueHelperDic)
|
|||
|
{
|
|||
|
item.Value?.Dispose();
|
|||
|
}
|
|||
|
|
|||
|
_connection?.Dispose();
|
|||
|
_channelDic.Clear();
|
|||
|
_queueHelperDic.Clear();
|
|||
|
}
|
|||
|
|
|||
|
#endregion 释放资源
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// RabbitMq连接配置
|
|||
|
/// </summary>
|
|||
|
public class MqConfig
|
|||
|
{
|
|||
|
/// <summary>
|
|||
|
/// 是否启用
|
|||
|
/// </summary>
|
|||
|
public bool Enabled { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 主机名
|
|||
|
/// </summary>
|
|||
|
public string HostName { get; set; } = "localhost";
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 心跳时间
|
|||
|
/// </summary>
|
|||
|
public TimeSpan RequestedHeartbeat { get; set; } = TimeSpan.FromSeconds(10);
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 自动重连
|
|||
|
/// </summary>
|
|||
|
public bool AutomaticRecoveryEnabled { get; set; } = true;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 重连时间
|
|||
|
/// </summary>
|
|||
|
public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(10);
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 用户名
|
|||
|
/// </summary>
|
|||
|
public string UserName { get; set; } = "guest";
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 密码
|
|||
|
/// </summary>
|
|||
|
public string Password { get; set; } = "guest";
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 端口号
|
|||
|
/// </summary>
|
|||
|
public int Port { get; set; } = 5672;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 虚拟主机
|
|||
|
/// </summary>
|
|||
|
public string VirtualHost { get; set; } = "/";
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 自定义的RabbitMq队列信息实体特性
|
|||
|
/// </summary>
|
|||
|
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
|
|||
|
public class RabbitMqAttribute : Attribute
|
|||
|
{
|
|||
|
/// <summary>
|
|||
|
/// 交换机
|
|||
|
/// </summary>
|
|||
|
public string Exchange { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 交换机类型(广播fanout,队列direct)
|
|||
|
/// </summary>
|
|||
|
public string ExchangeType { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列
|
|||
|
/// </summary>
|
|||
|
public string Queue { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 路由键
|
|||
|
/// </summary>
|
|||
|
public string RoutingKey { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 是否持久化,默认true
|
|||
|
/// </summary>
|
|||
|
public bool Durable { get; set; } = true;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 预取数量,默认1
|
|||
|
/// </summary>
|
|||
|
public ushort PrefetchCount { get; set; } = 1;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 异常重试次数,默认5次
|
|||
|
/// </summary>
|
|||
|
public int RetryCount { get; set; } = 5;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 消息过期时间,过期后队列中消息自动被删除,单位ms
|
|||
|
/// </summary>
|
|||
|
public int MessageTTL { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 消息头部对象json字符串
|
|||
|
/// </summary>
|
|||
|
public string Header { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列消费者数量,默认1
|
|||
|
/// </summary>
|
|||
|
public int ConsumerCount { get; set; } = 1;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列过期时间,过期后队列自动被删除,单位ms
|
|||
|
/// </summary>
|
|||
|
public int AutoExpire { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列最大长度
|
|||
|
/// </summary>
|
|||
|
public int MaxLength { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列占用的最大空间
|
|||
|
/// </summary>
|
|||
|
public int MaxLengthBytes { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列溢出行为,可选值:drop-head或者reject-publish,默认drop-head
|
|||
|
/// </summary>
|
|||
|
public string OverflowBehaviour { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 是否启用系统内置死信队列处理逻辑,默认true;注意:此处与原生的死信无关
|
|||
|
/// </summary>
|
|||
|
public bool DeadLetter { get; set; } = true;
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 死信交换机
|
|||
|
/// </summary>
|
|||
|
public string DeadLetterExchange { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 死信路由键
|
|||
|
/// </summary>
|
|||
|
public string DeadLetterRoutingKey { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列最大优先级,数值越大优先级越高,范围1-255,建议取值1-10
|
|||
|
/// </summary>
|
|||
|
public int MaximumPriority { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列惰性模式,可选值:default或者lazy
|
|||
|
/// </summary>
|
|||
|
public string LazyMode { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 主定位器,集群配置
|
|||
|
/// </summary>
|
|||
|
public string MasterLocator { get; set; }
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 内置死信队列实体
|
|||
|
/// </summary>
|
|||
|
public class DeadLetterQueue
|
|||
|
{
|
|||
|
/// <summary>
|
|||
|
/// 消息内容
|
|||
|
/// </summary>
|
|||
|
public string Body { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 交换机
|
|||
|
/// </summary>
|
|||
|
public string Exchange { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列
|
|||
|
/// </summary>
|
|||
|
public string Queue { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 路由键
|
|||
|
/// </summary>
|
|||
|
public string RoutingKey { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 重试次数
|
|||
|
/// </summary>
|
|||
|
public int RetryCount { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 异常消息
|
|||
|
/// </summary>
|
|||
|
public string ExceptionMsg { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 异常
|
|||
|
/// </summary>
|
|||
|
public Exception Exception { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 创建时间
|
|||
|
/// </summary>
|
|||
|
public DateTime CreateDateTime { get; set; } = DateTime.Now;
|
|||
|
}
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 内存队列消息
|
|||
|
/// </summary>
|
|||
|
public class QueueMessage
|
|||
|
{
|
|||
|
/// <summary>
|
|||
|
/// 交换机名称
|
|||
|
/// </summary>
|
|||
|
public string Exchange { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列名称
|
|||
|
/// </summary>
|
|||
|
public string Queue { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 路由键
|
|||
|
/// </summary>
|
|||
|
public string RoutingKey { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 消息内容
|
|||
|
/// </summary>
|
|||
|
public IEnumerable<string> Body { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 交换机类型
|
|||
|
/// </summary>
|
|||
|
public string ExchangeType { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 持久化
|
|||
|
/// </summary>
|
|||
|
public bool Durable { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 消息发送确认
|
|||
|
/// </summary>
|
|||
|
public bool Confirm { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 单个消息过期时间,单位ms
|
|||
|
/// </summary>
|
|||
|
public string Expiration { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
|
|||
|
/// </summary>
|
|||
|
public byte? Priority { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 队列参数
|
|||
|
/// </summary>
|
|||
|
public IDictionary<string, object> QueueArguments { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 交换机参数
|
|||
|
/// </summary>
|
|||
|
public IDictionary<string, object> ExchangeArguments { get; set; }
|
|||
|
|
|||
|
/// <summary>
|
|||
|
/// 消息头部
|
|||
|
/// </summary>
|
|||
|
public IDictionary<string, object> Headers { get; set; }
|
|||
|
}
|
|||
|
}
|