2066 lines
60 KiB
C#
2066 lines
60 KiB
C#
using RabbitMQ.Client;
|
||
using RabbitMQ.Client.Events;
|
||
using Serenity;
|
||
using System;
|
||
using System.Collections.Concurrent;
|
||
using System.Collections.Generic;
|
||
using System.Linq;
|
||
using System.Text;
|
||
using System.Threading.Tasks;
|
||
|
||
namespace HT.Cloud.Code
|
||
{
|
||
/// <summary>
|
||
/// RabbitMq工具类
|
||
/// </summary>
|
||
public class RabbitMqHelper : IDisposable
|
||
{
|
||
#region 私有静态字段
|
||
|
||
/// <summary>
|
||
/// RabbitMQ建议客户端线程之间不要共用Model,至少要保证共用Model的线程发送消息必须是串行的,但是建议尽量共用Connection。
|
||
/// </summary>
|
||
private static readonly ConcurrentDictionary<string, IModel> _channelDic = new();
|
||
|
||
/// <summary>
|
||
/// QueueHelper缓存
|
||
/// </summary>
|
||
private static readonly ConcurrentDictionary<string, QueueHelper<QueueMessage>> _queueHelperDic = new();
|
||
|
||
/// <summary>
|
||
/// 用于缓存路由键数据
|
||
/// </summary>
|
||
private static readonly ConcurrentDictionary<string, (string exchange, string queue, string routeKey)> _routeDic = new();
|
||
|
||
/// <summary>
|
||
/// RabbitMq连接
|
||
/// </summary>
|
||
private static IConnection _connection;
|
||
|
||
/// <summary>
|
||
/// 线程对象,线程锁使用
|
||
/// </summary>
|
||
private static readonly object _locker = new();
|
||
|
||
#endregion 私有静态字段
|
||
|
||
#region 公有属性
|
||
|
||
/// <summary>
|
||
/// 默认内置死信交换机,默认值:deadletter.default.router
|
||
/// </summary>
|
||
public string DefaultDeadLetterExchange { get; set; } = "deadletter.default.router";
|
||
|
||
#endregion 公有属性
|
||
|
||
#region 构造函数
|
||
|
||
/// <summary>
|
||
/// 构造函数
|
||
/// </summary>
|
||
/// <param name="config">RabbitMq配置</param>
|
||
public RabbitMqHelper(MqConfig config)
|
||
{
|
||
if (config == null)
|
||
throw new ArgumentNullException(nameof(config));
|
||
|
||
if (_connection == null)
|
||
{
|
||
lock (_locker)
|
||
{
|
||
if (_connection == null)
|
||
_connection = new ConnectionFactory
|
||
{
|
||
//设置主机名
|
||
HostName = config.HostName,
|
||
|
||
//虚拟主机
|
||
VirtualHost = config.VirtualHost,
|
||
|
||
//设置心跳时间
|
||
RequestedHeartbeat = config.RequestedHeartbeat,
|
||
|
||
//设置自动重连
|
||
AutomaticRecoveryEnabled = config.AutomaticRecoveryEnabled,
|
||
|
||
//重连时间
|
||
NetworkRecoveryInterval = config.NetworkRecoveryInterval,
|
||
|
||
//用户名
|
||
UserName = config.UserName,
|
||
|
||
//密码
|
||
Password = config.Password,
|
||
|
||
//配置端口
|
||
Port=config.Port
|
||
}.CreateConnection();
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 构造函数
|
||
/// </summary>
|
||
/// <param name="factory">RabbitMq连接工厂</param>
|
||
public RabbitMqHelper(ConnectionFactory factory)
|
||
{
|
||
if (factory == null)
|
||
throw new ArgumentNullException(nameof(factory));
|
||
|
||
if (_connection == null)
|
||
{
|
||
lock (_locker)
|
||
{
|
||
if (_connection == null)
|
||
_connection = factory.CreateConnection();
|
||
}
|
||
}
|
||
}
|
||
|
||
#endregion 构造函数
|
||
|
||
#region 管道
|
||
|
||
/// <summary>
|
||
/// 获取管道
|
||
/// </summary>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <returns></returns>
|
||
public IModel GetChannel(string queue = "default")
|
||
{
|
||
IModel channel = null;
|
||
|
||
if (!string.IsNullOrEmpty(queue) && _channelDic.ContainsKey(queue))
|
||
channel = _channelDic[queue];
|
||
|
||
if (!string.IsNullOrEmpty(queue) && channel == null)
|
||
{
|
||
lock (_locker)
|
||
{
|
||
channel = _channelDic.GetOrAdd(queue, queue =>
|
||
{
|
||
var model = _connection.CreateModel();
|
||
_channelDic[queue] = model;
|
||
|
||
return model;
|
||
});
|
||
}
|
||
}
|
||
|
||
channel ??= _connection.CreateModel();
|
||
|
||
return EnsureOpened(channel);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 确保管道是已打开状态
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <returns></returns>
|
||
public IModel EnsureOpened(IModel channel)
|
||
{
|
||
channel ??= GetChannel();
|
||
|
||
if (channel.IsClosed)
|
||
{
|
||
var data = _channelDic.Where(x => x.Value == channel)?.FirstOrDefault();
|
||
if (data != null && data.Value.Key != null)
|
||
{
|
||
//移除已关闭的管道
|
||
_channelDic.TryRemove(data.Value.Key, out var model);
|
||
|
||
//重新获取管道
|
||
channel = GetChannel(data.Value.Key);
|
||
}
|
||
else
|
||
{
|
||
channel = GetChannel();
|
||
}
|
||
}
|
||
|
||
return channel;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 如果交换机或者队列不存在则移除缓存中异常关闭的管道
|
||
/// </summary>
|
||
/// <param name="channel"></param>
|
||
public void RemoveIfNotExist(IModel channel)
|
||
{
|
||
var data = _channelDic.Where(x => x.Value == channel)?.FirstOrDefault();
|
||
if (data != null && data.Value.Key != null)
|
||
{
|
||
//移除已关闭的管道
|
||
_channelDic.TryRemove(data.Value.Key, out var model);
|
||
channel = null;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取QueueHelper
|
||
/// </summary>
|
||
/// <param name="queue"></param>
|
||
/// <returns></returns>
|
||
public QueueHelper<QueueMessage> GetQueueHelper(string queue)
|
||
{
|
||
if (!string.IsNullOrEmpty(queue))
|
||
queue = "default";
|
||
|
||
if (_queueHelperDic.ContainsKey(queue))
|
||
return _queueHelperDic[queue];
|
||
|
||
lock (_locker)
|
||
{
|
||
return _queueHelperDic.GetOrAdd(queue, queue =>
|
||
{
|
||
var queueHelper = new QueueHelper<QueueMessage>(x =>
|
||
PublishRabbitMqMessage(
|
||
x.Exchange,
|
||
x.Queue,
|
||
x.RoutingKey,
|
||
x.Body,
|
||
x.ExchangeType,
|
||
x.Durable,
|
||
x.Confirm,
|
||
x.Expiration,
|
||
x.Priority,
|
||
x.QueueArguments,
|
||
x.ExchangeArguments,
|
||
x.Headers));
|
||
|
||
_queueHelperDic[queue] = queueHelper;
|
||
|
||
return queueHelper;
|
||
});
|
||
}
|
||
}
|
||
|
||
#endregion 管道
|
||
|
||
#region 交换机
|
||
|
||
/// <summary>
|
||
/// 声明交换机,交换机可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="exchangeType">交换机类型:
|
||
/// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
|
||
/// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
|
||
/// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
|
||
/// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
|
||
/// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
|
||
/// 交换机转发消息是最快的。
|
||
/// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
|
||
/// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
|
||
/// 只会匹配到“audit.irs”。</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="autoDelete">自动删除</param>
|
||
/// <param name="arguments">参数</param>
|
||
/// <returns>管道</returns>
|
||
public IModel ExchangeDeclare(
|
||
IModel channel,
|
||
string exchange,
|
||
string exchangeType = ExchangeType.Direct,
|
||
bool durable = true,
|
||
bool autoDelete = false,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
channel = EnsureOpened(channel);
|
||
channel.ExchangeDeclare(exchange, exchangeType, durable, autoDelete, arguments);
|
||
|
||
return channel;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 声明交换机,交换机可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="exchangeType">交换机类型:
|
||
/// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
|
||
/// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
|
||
/// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
|
||
/// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
|
||
/// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
|
||
/// 交换机转发消息是最快的。
|
||
/// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
|
||
/// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
|
||
/// 只会匹配到“audit.irs”。</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="autoDelete">自动删除</param>
|
||
/// <param name="arguments">参数</param>
|
||
/// <returns>管道</returns>
|
||
public IModel ExchangeDeclareNoWait(
|
||
IModel channel,
|
||
string exchange,
|
||
string exchangeType = ExchangeType.Direct,
|
||
bool durable = true,
|
||
bool autoDelete = false,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
channel = EnsureOpened(channel);
|
||
channel.ExchangeDeclareNoWait(exchange, exchangeType, durable, autoDelete, arguments);
|
||
|
||
return channel;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 被动声明交换机,用于判断交换机是否存在,若存在则无异常,若不存在则抛出异常
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
public void ExchangeDeclarePassive(IModel channel, string exchange)
|
||
{
|
||
EnsureOpened(channel).ExchangeDeclarePassive(exchange);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 判断交换机是否存在
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <returns></returns>
|
||
public bool IsExchangeExist(IModel channel, string exchange)
|
||
{
|
||
try
|
||
{
|
||
ExchangeDeclarePassive(channel, exchange);
|
||
return true;
|
||
}
|
||
catch
|
||
{
|
||
RemoveIfNotExist(channel);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 删除交换机
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="ifUnused">是否没有被使用</param>
|
||
public void ExchangeDelete(
|
||
IModel channel,
|
||
string exchange,
|
||
bool ifUnused = false)
|
||
{
|
||
RouteKeyRemoveByExchange(exchange);
|
||
EnsureOpened(channel).ExchangeDelete(exchange, ifUnused);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 删除交换机
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="ifUnused">是否没有被使用</param>
|
||
public void ExchangeDeleteNoWait(
|
||
IModel channel,
|
||
string exchange,
|
||
bool ifUnused = false)
|
||
{
|
||
RouteKeyRemoveByExchange(exchange);
|
||
EnsureOpened(channel).ExchangeDeleteNoWait(exchange, ifUnused);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 绑定交换机
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="destinationExchange">目标交换机</param>
|
||
/// <param name="sourceExchange">源交换机</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="arguments">参数</param>
|
||
public void ExchangeBind(
|
||
IModel channel,
|
||
string destinationExchange,
|
||
string sourceExchange,
|
||
string routingKey,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
EnsureOpened(channel).ExchangeBind(destinationExchange, sourceExchange, routingKey, arguments);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 绑定交换机
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="destinationExchange">目标交换机</param>
|
||
/// <param name="sourceExchange">源交换机</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="arguments">参数</param>
|
||
public void ExchangeBindNoWait(
|
||
IModel channel,
|
||
string destinationExchange,
|
||
string sourceExchange,
|
||
string routingKey,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
EnsureOpened(channel).ExchangeBindNoWait(destinationExchange, sourceExchange, routingKey, arguments);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 解绑交换机
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="destinationExchange">目标交换机</param>
|
||
/// <param name="sourceExchange">源交换机</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="arguments">参数</param>
|
||
public void ExchangeUnbind(
|
||
IModel channel,
|
||
string destinationExchange,
|
||
string sourceExchange,
|
||
string routingKey,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
EnsureOpened(channel).ExchangeUnbind(destinationExchange, sourceExchange, routingKey, arguments);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 解绑交换机
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="destinationExchange">目标交换机</param>
|
||
/// <param name="sourceExchange">源交换机</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="arguments">参数</param>
|
||
public void ExchangeUnbindNoWait(
|
||
IModel channel,
|
||
string destinationExchange,
|
||
string sourceExchange,
|
||
string routingKey,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
EnsureOpened(channel).ExchangeUnbindNoWait(destinationExchange, sourceExchange, routingKey, arguments);
|
||
}
|
||
|
||
#endregion 交换机
|
||
|
||
#region 队列
|
||
|
||
/// <summary>
|
||
/// 声明队列,队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
|
||
/// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
|
||
/// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
|
||
/// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
|
||
/// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>
|
||
/// <param name="autoDelete">自动删除</param>
|
||
/// <param name="arguments">参数</param>
|
||
/// <returns>管道</returns>
|
||
public IModel QueueDeclare(
|
||
IModel channel,
|
||
string queue,
|
||
bool durable = true,
|
||
bool exclusive = false,
|
||
bool autoDelete = false,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
channel = EnsureOpened(channel);
|
||
channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
|
||
|
||
return channel;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 声明队列,队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常。
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
|
||
/// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
|
||
/// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
|
||
/// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
|
||
/// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>
|
||
/// <param name="autoDelete">自动删除</param>
|
||
/// <param name="arguments">参数</param>
|
||
/// <returns>管道</returns>
|
||
public IModel QueueDeclareNoWait(
|
||
IModel channel,
|
||
string queue,
|
||
bool durable = true,
|
||
bool exclusive = false,
|
||
bool autoDelete = false,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
channel = EnsureOpened(channel);
|
||
channel.QueueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
|
||
|
||
return channel;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 被动声明队列,用于判断队列是否存在,若队列存在则无异常,若不存在则抛异常
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="queue">队列名称</param>
|
||
public void QueueDeclarePassive(IModel channel, string queue)
|
||
{
|
||
EnsureOpened(channel).QueueDeclarePassive(queue);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 判断队列是否存在
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <returns></returns>
|
||
public bool IsQueueExist(IModel channel, string queue)
|
||
{
|
||
try
|
||
{
|
||
QueueDeclarePassive(channel, queue);
|
||
return true;
|
||
}
|
||
catch
|
||
{
|
||
RemoveIfNotExist(channel);
|
||
return false;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 删除队列
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="ifUnused">是否没有被使用</param>
|
||
/// <param name="ifEmpty">是否为空</param>
|
||
/// <returns></returns>
|
||
public uint QueueDelete(
|
||
IModel channel,
|
||
string queue,
|
||
bool ifUnused = false,
|
||
bool ifEmpty = false)
|
||
{
|
||
RouteKeyRemoveByQueue(queue);
|
||
return EnsureOpened(channel).QueueDelete(queue, ifUnused, ifEmpty);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 删除队列
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="ifUnused">是否没有被使用</param>
|
||
/// <param name="ifEmpty">是否为空</param>
|
||
public void QueueDeleteNoWait(
|
||
IModel channel,
|
||
string queue,
|
||
bool ifUnused = false,
|
||
bool ifEmpty = false)
|
||
{
|
||
RouteKeyRemoveByQueue(queue);
|
||
EnsureOpened(channel).QueueDeleteNoWait(queue, ifUnused, ifEmpty);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 绑定队列
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="arguments">参数</param>
|
||
public void QueueBind(
|
||
IModel channel,
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
if (IsRouteKeyExist(exchange, queue, routingKey))
|
||
return;
|
||
|
||
EnsureOpened(channel).QueueBind(queue, exchange, routingKey, arguments);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 绑定队列
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="arguments">参数</param>
|
||
public void QueueBindNoWait(
|
||
IModel channel,
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
if (IsRouteKeyExist(exchange, queue, routingKey))
|
||
return;
|
||
|
||
EnsureOpened(channel).QueueBindNoWait(queue, exchange, routingKey, arguments);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 解绑队列
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="arguments">参数</param>
|
||
public void QueueUnbind(
|
||
IModel channel,
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
IDictionary<string, object> arguments = null)
|
||
{
|
||
RouteKeyRemove(exchange, queue, routingKey);
|
||
EnsureOpened(channel).QueueUnbind(queue, exchange, routingKey, arguments);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 清除队列
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="queue">队列名称</param>
|
||
public void QueuePurge(IModel channel, string queue)
|
||
{
|
||
EnsureOpened(channel).QueuePurge(queue);
|
||
}
|
||
|
||
#endregion 队列
|
||
|
||
#region 路由
|
||
|
||
/// <summary>
|
||
/// 判断路由键是否存在,仅在内存中作判断
|
||
/// </summary>
|
||
/// <param name="exchange">交换机</param>
|
||
/// <param name="queue">队列</param>
|
||
/// <param name="routeKey">路由键</param>
|
||
/// <returns></returns>
|
||
public bool IsRouteKeyExist(string exchange, string queue, string routeKey)
|
||
{
|
||
var key = $"{exchange}.{queue}.{routeKey}";
|
||
|
||
if (_routeDic.ContainsKey(key))
|
||
return true;
|
||
else
|
||
{
|
||
lock (_locker)
|
||
{
|
||
if (!_routeDic.ContainsKey(key))
|
||
{
|
||
_routeDic.TryAdd(key, (exchange, queue, routeKey));
|
||
return false;
|
||
}
|
||
return true;
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 移除路由键缓存
|
||
/// </summary>
|
||
/// <param name="exchange"></param>
|
||
/// <param name="queue"></param>
|
||
/// <param name="routeKey"></param>
|
||
public void RouteKeyRemove(string exchange, string queue, string routeKey)
|
||
{
|
||
var key = $"{exchange}.{queue}.{routeKey}";
|
||
if (_routeDic.ContainsKey(key))
|
||
{
|
||
lock (_locker)
|
||
{
|
||
if (_routeDic.ContainsKey(key))
|
||
_routeDic.TryRemove(key, out var _);
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 移除路由键缓存
|
||
/// </summary>
|
||
/// <param name="queue">队列</param>
|
||
public void RouteKeyRemoveByQueue(string queue)
|
||
{
|
||
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.queue == queue))
|
||
{
|
||
lock (_locker)
|
||
{
|
||
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.queue == queue))
|
||
{
|
||
var keys = _routeDic.Keys.Where(x => x.Split('.')[1] == queue);
|
||
foreach (var key in keys)
|
||
{
|
||
_routeDic.TryRemove(key, out var _);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 移除路由键缓存
|
||
/// </summary>
|
||
/// <param name="exchange">交换机</param>
|
||
public void RouteKeyRemoveByExchange(string exchange)
|
||
{
|
||
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.exchange == exchange))
|
||
{
|
||
lock (_locker)
|
||
{
|
||
if (!_routeDic.IsEmpty && _routeDic.Values.Any(x => x.exchange == exchange))
|
||
{
|
||
var keys = _routeDic.Keys.Where(x => x.Split('.')[0] == exchange);
|
||
foreach (var key in keys)
|
||
{
|
||
_routeDic.TryRemove(key, out var _);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
#endregion 路由
|
||
|
||
#region 发布消息
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <param name="command">消息指令</param>
|
||
/// <param name="confirm">消息发送确认</param>
|
||
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
||
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
||
/// <returns></returns>
|
||
public bool Publish<T>(
|
||
T command,
|
||
bool confirm = false,
|
||
string expiration = null,
|
||
byte? priority = null)
|
||
where T : class
|
||
{
|
||
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
||
if (attribute == null)
|
||
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
||
|
||
return Publish(attribute, command, confirm, expiration, priority);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <param name="attribute">RabbitMq特性配置</param>
|
||
/// <param name="command">消息指令</param>
|
||
/// <param name="confirm">消息发送确认</param>
|
||
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
||
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
||
/// <returns></returns>
|
||
public bool Publish<T>(
|
||
RabbitMqAttribute attribute,
|
||
T command,
|
||
bool confirm = false,
|
||
string expiration = null,
|
||
byte? priority = null)
|
||
where T : class
|
||
{
|
||
//消息内容
|
||
var body = command.ToJson();
|
||
|
||
//自定义队列参数
|
||
var arguments = new Dictionary<string, object>();
|
||
|
||
//设置队列消息过期时间,指整个队列的所有消息
|
||
if (attribute.MessageTTL > 0)
|
||
arguments["x-message-ttl"] = attribute.MessageTTL;
|
||
|
||
//设置队列过期时间
|
||
if (attribute.AutoExpire > 0)
|
||
arguments["x-expires"] = attribute.AutoExpire;
|
||
|
||
//设置队列最大长度
|
||
if (attribute.MaxLength > 0)
|
||
arguments["x-max-length"] = attribute.MaxLength;
|
||
|
||
//设置队列占用最大空间
|
||
if (attribute.MaxLengthBytes > 0)
|
||
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
|
||
|
||
//设置队列溢出行为
|
||
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
|
||
arguments["x-overflow"] = attribute.OverflowBehaviour;
|
||
|
||
//设置死信交换机
|
||
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
|
||
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
|
||
|
||
//设置死信路由键
|
||
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
|
||
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
|
||
|
||
//设置队列优先级
|
||
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
|
||
arguments["x-max-priority"] = attribute.MaximumPriority;
|
||
|
||
//设置队列惰性模式
|
||
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
|
||
arguments["x-queue-mode"] = attribute.LazyMode;
|
||
|
||
//设置集群配置
|
||
if (!attribute.MasterLocator.IsNullOrEmpty())
|
||
arguments["x-queue-master-locator"] = attribute.MasterLocator;
|
||
|
||
//发送消息
|
||
return Publish(
|
||
attribute.Exchange,
|
||
attribute.Queue,
|
||
attribute.RoutingKey,
|
||
body,
|
||
attribute.ExchangeType,
|
||
attribute.Durable,
|
||
confirm,
|
||
expiration,
|
||
priority,
|
||
arguments,
|
||
null,
|
||
attribute.Header?.ToObject<Dictionary<string, object>>());
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <param name="command">消息指令</param>
|
||
/// <param name="confirm">消息发送确认</param>
|
||
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
||
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
||
/// <returns></returns>
|
||
public bool Publish<T>(
|
||
IEnumerable<T> command,
|
||
bool confirm = false,
|
||
string expiration = null,
|
||
byte? priority = null)
|
||
where T : class
|
||
{
|
||
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
||
if (attribute == null)
|
||
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
||
|
||
return Publish(attribute, command, confirm, expiration, priority);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <param name="attribute">RabbitMq特性配置</param>
|
||
/// <param name="command">消息指令</param>
|
||
/// <param name="confirm">消息发送确认</param>
|
||
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
||
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
||
/// <returns></returns>
|
||
public bool Publish<T>(
|
||
RabbitMqAttribute attribute,
|
||
IEnumerable<T> command,
|
||
bool confirm = false,
|
||
string expiration = null,
|
||
byte? priority = null)
|
||
where T : class
|
||
{
|
||
//消息内容
|
||
var body = command.Select(x => x.ToJson());
|
||
|
||
//自定义队列参数
|
||
var arguments = new Dictionary<string, object>();
|
||
|
||
//设置队列消息过期时间,指整个队列的所有消息
|
||
if (attribute.MessageTTL > 0)
|
||
arguments["x-message-ttl"] = attribute.MessageTTL;
|
||
|
||
//设置队列过期时间
|
||
if (attribute.AutoExpire > 0)
|
||
arguments["x-expires"] = attribute.AutoExpire;
|
||
|
||
//设置队列最大长度
|
||
if (attribute.MaxLength > 0)
|
||
arguments["x-max-length"] = attribute.MaxLength;
|
||
|
||
//设置队列占用最大空间
|
||
if (attribute.MaxLengthBytes > 0)
|
||
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
|
||
|
||
//设置队列溢出行为
|
||
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
|
||
arguments["x-overflow"] = attribute.OverflowBehaviour;
|
||
|
||
//设置死信交换机
|
||
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
|
||
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
|
||
|
||
//设置死信路由键
|
||
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
|
||
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
|
||
|
||
//设置队列优先级
|
||
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
|
||
arguments["x-max-priority"] = attribute.MaximumPriority;
|
||
|
||
//设置队列惰性模式
|
||
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
|
||
arguments["x-queue-mode"] = attribute.LazyMode;
|
||
|
||
//设置集群配置
|
||
if (!attribute.MasterLocator.IsNullOrEmpty())
|
||
arguments["x-queue-master-locator"] = attribute.MasterLocator;
|
||
|
||
//发送消息
|
||
return Publish(
|
||
attribute.Exchange,
|
||
attribute.Queue,
|
||
attribute.RoutingKey,
|
||
body,
|
||
attribute.ExchangeType,
|
||
attribute.Durable,
|
||
confirm,
|
||
expiration,
|
||
priority,
|
||
arguments,
|
||
null,
|
||
attribute.Header?.ToObject<Dictionary<string, object>>());
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="body">消息内容</param>
|
||
/// <param name="exchangeType">交换机类型</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="confirm">消息发送确认</param>
|
||
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
||
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
||
/// <param name="queueArguments">队列参数</param>
|
||
/// <param name="exchangeArguments">交换机参数</param>
|
||
/// <param name="headers">消息头部</param>
|
||
/// <returns></returns>
|
||
public bool Publish(
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
string body,
|
||
string exchangeType = ExchangeType.Direct,
|
||
bool durable = true,
|
||
bool confirm = false,
|
||
string expiration = null,
|
||
byte? priority = null,
|
||
IDictionary<string, object> queueArguments = null,
|
||
IDictionary<string, object> exchangeArguments = null,
|
||
IDictionary<string, object> headers = null)
|
||
{
|
||
return Publish(
|
||
exchange,
|
||
queue,
|
||
routingKey,
|
||
new[] { body },
|
||
exchangeType,
|
||
durable,
|
||
confirm,
|
||
expiration,
|
||
priority,
|
||
queueArguments,
|
||
exchangeArguments,
|
||
headers);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息
|
||
/// </summary>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="body">消息内容</param>
|
||
/// <param name="exchangeType">交换机类型</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="confirm">消息发送确认</param>
|
||
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
||
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
||
/// <param name="queueArguments">队列参数</param>
|
||
/// <param name="exchangeArguments">交换机参数</param>
|
||
/// <param name="headers">消息头部</param>
|
||
/// <returns></returns>
|
||
public bool Publish(
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
IEnumerable<string> body,
|
||
string exchangeType = ExchangeType.Direct,
|
||
bool durable = true,
|
||
bool confirm = false,
|
||
string expiration = null,
|
||
byte? priority = null,
|
||
IDictionary<string, object> queueArguments = null,
|
||
IDictionary<string, object> exchangeArguments = null,
|
||
IDictionary<string, object> headers = null)
|
||
{
|
||
return GetQueueHelper(queue).Enqueue(
|
||
new QueueMessage
|
||
{
|
||
Exchange = exchange,
|
||
Queue = queue,
|
||
RoutingKey = routingKey,
|
||
Body = body,
|
||
ExchangeType = exchangeType,
|
||
Durable = durable,
|
||
Confirm = confirm,
|
||
Expiration = expiration,
|
||
Priority = priority,
|
||
QueueArguments = queueArguments,
|
||
ExchangeArguments = exchangeArguments,
|
||
Headers = headers
|
||
});
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息到RabbitMq,注意此方法使用IModel线程不安全,建议使用Publish方法,若要使用需要保证单线程调用
|
||
/// </summary>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="body">消息内容</param>
|
||
/// <param name="exchangeType">交换机类型</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="confirm">消息发送确认</param>
|
||
/// <param name="expiration">单个消息过期时间,单位ms</param>
|
||
/// <param name="priority">单个消息优先级,数值越大优先级越高,取值范围:0-9</param>
|
||
/// <param name="queueArguments">队列参数</param>
|
||
/// <param name="exchangeArguments">交换机参数</param>
|
||
/// <param name="headers">消息头部</param>
|
||
/// <returns></returns>
|
||
public bool PublishRabbitMqMessage(
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
IEnumerable<string> body,
|
||
string exchangeType = ExchangeType.Direct,
|
||
bool durable = true,
|
||
bool confirm = false,
|
||
string expiration = null,
|
||
byte? priority = null,
|
||
IDictionary<string, object> queueArguments = null,
|
||
IDictionary<string, object> exchangeArguments = null,
|
||
IDictionary<string, object> headers = null)
|
||
{
|
||
//获取管道
|
||
var channel = GetChannel(queue);
|
||
|
||
//判断交换机是否存在
|
||
if (!IsExchangeExist(channel, exchange))
|
||
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
|
||
|
||
//判断队列是否存在
|
||
if (!IsQueueExist(channel, queue))
|
||
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
|
||
|
||
//绑定交换机和队列
|
||
QueueBind(channel, exchange, queue, routingKey);
|
||
|
||
//声明消息属性
|
||
var props = channel.CreateBasicProperties();
|
||
|
||
//持久化
|
||
props.Persistent = durable;
|
||
|
||
//单个消息过期时间
|
||
if (!expiration.IsNullOrEmpty())
|
||
props.Expiration = expiration;
|
||
|
||
//单个消息优先级
|
||
if (priority >= 0 && priority <= 9)
|
||
props.Priority = priority.Value;
|
||
|
||
//消息头部
|
||
if (headers != null)
|
||
props.Headers = headers;
|
||
|
||
//是否启用消息发送确认机制
|
||
if (confirm)
|
||
channel.ConfirmSelect();
|
||
|
||
//发送消息
|
||
foreach (var item in body)
|
||
{
|
||
var text = string.IsNullOrEmpty(item) ? null : Encoding.UTF8.GetBytes(item);
|
||
channel.BasicPublish(exchange, routingKey, props, text);
|
||
}
|
||
|
||
//消息发送失败处理
|
||
if (confirm && !channel.WaitForConfirms())
|
||
return false;
|
||
|
||
return true;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 发布消息到内置死信队列
|
||
/// </summary>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="body">消息内容</param>
|
||
/// <param name="retryCount">重试次数</param>
|
||
/// <param name="exception">异常</param>
|
||
/// <returns></returns>
|
||
private bool PublishToDead(
|
||
string queue,
|
||
string exchange,
|
||
string routingKey,
|
||
string body,
|
||
int retryCount,
|
||
Exception exception)
|
||
{
|
||
//内置死信队列、交换机、路由键
|
||
var deadLetterQueue = $"{queue.ToLower()}.{(exception != null ? "error" : "fail")}";
|
||
var deadLetterExchange = DefaultDeadLetterExchange;
|
||
var deadLetterRoutingKey = $"{queue.ToLower()}.deadletter";
|
||
|
||
//内置死信队列内容
|
||
var deadLetterBody = new DeadLetterQueue
|
||
{
|
||
Body = body,
|
||
CreateDateTime = DateTime.Now,
|
||
Exception = exception,
|
||
ExceptionMsg = exception?.Message,
|
||
Queue = queue,
|
||
RoutingKey = routingKey,
|
||
Exchange = exchange,
|
||
RetryCount = retryCount
|
||
};
|
||
|
||
//发送内置死信消息,注意此处与原生的死信消息无关
|
||
return Publish(deadLetterExchange, deadLetterQueue, deadLetterRoutingKey, deadLetterBody.ToJson());
|
||
}
|
||
|
||
#endregion 发布消息
|
||
|
||
#region 订阅消息
|
||
|
||
/// <summary>
|
||
/// 订阅消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="subscriber">消费处理委托</param>
|
||
/// <param name="handler">异常处理委托</param>
|
||
/// <param name="registered">注册事件</param>
|
||
/// <param name="unregistered">取消注册事件</param>
|
||
/// <param name="shutdown">关闭事件</param>
|
||
public void Subscribe<T>(
|
||
Func<T, BasicDeliverEventArgs, bool> subscriber,
|
||
Action<string, int, Exception> handler,
|
||
EventHandler<ConsumerEventArgs> registered = null,
|
||
EventHandler<ConsumerEventArgs> unregistered = null,
|
||
EventHandler<ShutdownEventArgs> shutdown = null)
|
||
where T : class
|
||
{
|
||
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
||
if (attribute == null)
|
||
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
||
|
||
Subscribe(attribute, subscriber, handler, registered, unregistered, shutdown);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 订阅消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="attribute">RabbitMq特性配置</param>
|
||
/// <param name="subscriber">消费处理委托</param>
|
||
/// <param name="handler">异常处理委托</param>
|
||
/// <param name="registered">注册事件</param>
|
||
/// <param name="unregistered">取消注册事件</param>
|
||
/// <param name="shutdown">关闭事件</param>
|
||
public void Subscribe<T>(
|
||
RabbitMqAttribute attribute,
|
||
Func<T, BasicDeliverEventArgs, bool> subscriber,
|
||
Action<string, int, Exception> handler,
|
||
EventHandler<ConsumerEventArgs> registered = null,
|
||
EventHandler<ConsumerEventArgs> unregistered = null,
|
||
EventHandler<ShutdownEventArgs> shutdown = null)
|
||
where T : class
|
||
{
|
||
//自定义参数
|
||
var arguments = new Dictionary<string, object>();
|
||
|
||
//设置队列消息过期时间,指整个队列的所有消息
|
||
if (attribute.MessageTTL > 0)
|
||
arguments["x-message-ttl"] = attribute.MessageTTL;
|
||
|
||
//设置队列过期时间
|
||
if (attribute.AutoExpire > 0)
|
||
arguments["x-expires"] = attribute.AutoExpire;
|
||
|
||
//设置队列最大长度
|
||
if (attribute.MaxLength > 0)
|
||
arguments["x-max-length"] = attribute.MaxLength;
|
||
|
||
//设置队列占用最大空间
|
||
if (attribute.MaxLengthBytes > 0)
|
||
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
|
||
|
||
//设置队列溢出行为
|
||
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
|
||
arguments["x-overflow"] = attribute.OverflowBehaviour;
|
||
|
||
//设置死信交换机
|
||
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
|
||
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
|
||
|
||
//设置死信路由键
|
||
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
|
||
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
|
||
|
||
//设置队列优先级
|
||
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
|
||
arguments["x-max-priority"] = attribute.MaximumPriority;
|
||
|
||
//设置队列惰性模式
|
||
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
|
||
arguments["x-queue-mode"] = attribute.LazyMode;
|
||
|
||
//设置集群配置
|
||
if (!attribute.MasterLocator.IsNullOrEmpty())
|
||
arguments["x-queue-master-locator"] = attribute.MasterLocator;
|
||
|
||
//订阅消息
|
||
Subscribe(
|
||
attribute.Exchange,
|
||
attribute.Queue,
|
||
attribute.RoutingKey,
|
||
subscriber,
|
||
handler,
|
||
attribute.RetryCount,
|
||
attribute.PrefetchCount,
|
||
attribute.DeadLetter,
|
||
attribute.ExchangeType,
|
||
attribute.Durable,
|
||
attribute.ConsumerCount,
|
||
arguments,
|
||
registered: registered,
|
||
unregistered: unregistered,
|
||
shutdown: shutdown);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 订阅消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="subscriber">消费处理委托</param>
|
||
/// <param name="handler">异常处理委托</param>
|
||
/// <param name="registered">注册事件</param>
|
||
/// <param name="unregistered">取消注册事件</param>
|
||
/// <param name="shutdown">关闭事件</param>
|
||
public void Subscribe<T>(
|
||
Func<T, BasicDeliverEventArgs, Task<bool>> subscriber,
|
||
Func<string, int, Exception, Task> handler,
|
||
EventHandler<ConsumerEventArgs> registered = null,
|
||
EventHandler<ConsumerEventArgs> unregistered = null,
|
||
EventHandler<ShutdownEventArgs> shutdown = null)
|
||
where T : class
|
||
{
|
||
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
||
if (attribute == null)
|
||
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
||
|
||
Subscribe(attribute, subscriber, handler, registered, unregistered, shutdown);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 订阅消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="attribute">RabbitMq特性配置</param>
|
||
/// <param name="subscriber">消费处理委托</param>
|
||
/// <param name="handler">异常处理委托</param>
|
||
/// <param name="registered">注册事件</param>
|
||
/// <param name="unregistered">取消注册事件</param>
|
||
/// <param name="shutdown">关闭事件</param>
|
||
public void Subscribe<T>(
|
||
RabbitMqAttribute attribute,
|
||
Func<T, BasicDeliverEventArgs, Task<bool>> subscriber,
|
||
Func<string, int, Exception, Task> handler,
|
||
EventHandler<ConsumerEventArgs> registered = null,
|
||
EventHandler<ConsumerEventArgs> unregistered = null,
|
||
EventHandler<ShutdownEventArgs> shutdown = null)
|
||
where T : class
|
||
{
|
||
//自定义参数
|
||
var arguments = new Dictionary<string, object>();
|
||
|
||
//设置队列消息过期时间,指整个队列的所有消息
|
||
if (attribute.MessageTTL > 0)
|
||
arguments["x-message-ttl"] = attribute.MessageTTL;
|
||
|
||
//设置队列过期时间
|
||
if (attribute.AutoExpire > 0)
|
||
arguments["x-expires"] = attribute.AutoExpire;
|
||
|
||
//设置队列最大长度
|
||
if (attribute.MaxLength > 0)
|
||
arguments["x-max-length"] = attribute.MaxLength;
|
||
|
||
//设置队列占用最大空间
|
||
if (attribute.MaxLengthBytes > 0)
|
||
arguments["x-max-length-bytes"] = attribute.MaxLengthBytes;
|
||
|
||
//设置队列溢出行为
|
||
if (attribute.OverflowBehaviour == "drop-head" || attribute.OverflowBehaviour == "reject-publish")
|
||
arguments["x-overflow"] = attribute.OverflowBehaviour;
|
||
|
||
//设置死信交换机
|
||
if (!attribute.DeadLetterExchange.IsNullOrEmpty())
|
||
arguments["x-dead-letter-exchange"] = attribute.DeadLetterExchange;
|
||
|
||
//设置死信路由键
|
||
if (!attribute.DeadLetterRoutingKey.IsNullOrEmpty())
|
||
arguments["x-dead-letter-routing-key"] = attribute.DeadLetterRoutingKey;
|
||
|
||
//设置队列优先级
|
||
if (attribute.MaximumPriority > 0 && attribute.MaximumPriority <= 10)
|
||
arguments["x-max-priority"] = attribute.MaximumPriority;
|
||
|
||
//设置队列惰性模式
|
||
if (attribute.LazyMode == "default" || attribute.LazyMode == "lazy")
|
||
arguments["x-queue-mode"] = attribute.LazyMode;
|
||
|
||
//设置集群配置
|
||
if (!attribute.MasterLocator.IsNullOrEmpty())
|
||
arguments["x-queue-master-locator"] = attribute.MasterLocator;
|
||
|
||
//订阅消息
|
||
Subscribe(
|
||
attribute.Exchange,
|
||
attribute.Queue,
|
||
attribute.RoutingKey,
|
||
subscriber,
|
||
handler,
|
||
attribute.RetryCount,
|
||
attribute.PrefetchCount,
|
||
attribute.DeadLetter,
|
||
attribute.ExchangeType,
|
||
attribute.Durable,
|
||
attribute.ConsumerCount,
|
||
arguments,
|
||
registered: registered,
|
||
unregistered: unregistered,
|
||
shutdown: shutdown);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 订阅消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="subscriber">消费处理委托</param>
|
||
/// <param name="handler">异常处理委托</param>
|
||
/// <param name="retryCount">重试次数</param>
|
||
/// <param name="prefetchCount">预取数量</param>
|
||
/// <param name="deadLetter">是否进入内置死信队列,此处与原生死信无关</param>
|
||
/// <param name="exchangeType">交换机类型</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="consumerCount">消费者数量</param>
|
||
/// <param name="queueArguments">队列参数</param>
|
||
/// <param name="exchangeArguments">交换机参数</param>
|
||
/// <param name="registered">注册事件</param>
|
||
/// <param name="unregistered">取消注册事件</param>
|
||
/// <param name="shutdown">关闭事件</param>
|
||
public void Subscribe<T>(
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
Func<T, BasicDeliverEventArgs, bool> subscriber,
|
||
Action<string, int, Exception> handler,
|
||
int retryCount = 5,
|
||
ushort prefetchCount = 1,
|
||
bool deadLetter = true,
|
||
string exchangeType = ExchangeType.Direct,
|
||
bool durable = true,
|
||
int consumerCount = 1,
|
||
IDictionary<string, object> queueArguments = null,
|
||
IDictionary<string, object> exchangeArguments = null,
|
||
EventHandler<ConsumerEventArgs> registered = null,
|
||
EventHandler<ConsumerEventArgs> unregistered = null,
|
||
EventHandler<ShutdownEventArgs> shutdown = null)
|
||
where T : class
|
||
{
|
||
//获取管道
|
||
var channel = GetChannel(queue);
|
||
|
||
//判断交换机是否存在
|
||
if (!IsExchangeExist(channel, exchange))
|
||
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
|
||
|
||
//判断队列是否存在
|
||
if (!IsQueueExist(channel, queue))
|
||
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
|
||
|
||
//绑定交换机和队列
|
||
QueueBind(channel, exchange, queue, routingKey);
|
||
|
||
//设置每次预取数量
|
||
channel.BasicQos(0, prefetchCount, false);
|
||
|
||
//根据设置的消费者数量创建消费者
|
||
for (int i = 0; i < consumerCount; i++)
|
||
{
|
||
//创建消费者
|
||
var consumer = new EventingBasicConsumer(channel);
|
||
|
||
//接收消息事件
|
||
consumer.Received += (sender, ea) =>
|
||
{
|
||
var text = Encoding.UTF8.GetString(ea.Body.ToArray());
|
||
var body = text;
|
||
var numberOfRetries = 0;
|
||
Exception exception = null;
|
||
bool? result = false;
|
||
|
||
while (numberOfRetries <= retryCount)
|
||
{
|
||
try
|
||
{
|
||
var msg = body.ToObject<T>();
|
||
|
||
result = subscriber?.Invoke(msg, ea);
|
||
|
||
if (result == true)
|
||
channel.BasicAck(ea.DeliveryTag, false);
|
||
else
|
||
channel.BasicNack(ea.DeliveryTag, false, false);
|
||
|
||
//异常置空
|
||
exception = null;
|
||
break;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
exception = ex;
|
||
handler?.Invoke(body, numberOfRetries, ex);
|
||
numberOfRetries++;
|
||
}
|
||
}
|
||
|
||
//重试后异常仍未解决
|
||
if (exception != null)
|
||
channel.BasicNack(ea.DeliveryTag, false, false);
|
||
|
||
//是否进入内置死信队列
|
||
if (deadLetter && (!(result == true) || exception != null))
|
||
PublishToDead(
|
||
queue,
|
||
ea.Exchange,
|
||
ea.RoutingKey,
|
||
body,
|
||
exception == null ? numberOfRetries : numberOfRetries - 1,
|
||
exception);
|
||
};
|
||
|
||
//注册事件
|
||
if (registered != null)
|
||
consumer.Registered += registered;
|
||
|
||
//取消注册事件
|
||
if (unregistered != null)
|
||
consumer.Unregistered += unregistered;
|
||
|
||
//关闭事件
|
||
if (shutdown != null)
|
||
consumer.Shutdown += shutdown;
|
||
|
||
//手动确认
|
||
channel.BasicConsume(queue, false, consumer);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 订阅消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="subscriber">消费处理委托</param>
|
||
/// <param name="handler">异常处理委托</param>
|
||
/// <param name="retryCount">重试次数</param>
|
||
/// <param name="prefetchCount">预取数量</param>
|
||
/// <param name="deadLetter">是否进入内置死信队列,此处与原生死信无关</param>
|
||
/// <param name="exchangeType">交换机类型</param>
|
||
/// <param name="durable">持久化</param>
|
||
/// <param name="consumerCount">消费者数量</param>
|
||
/// <param name="queueArguments">队列参数</param>
|
||
/// <param name="exchangeArguments">交换机参数</param>
|
||
/// <param name="registered">注册事件</param>
|
||
/// <param name="unregistered">取消注册事件</param>
|
||
/// <param name="shutdown">关闭事件</param>
|
||
public void Subscribe<T>(
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
Func<T, BasicDeliverEventArgs, Task<bool>> subscriber,
|
||
Func<string, int, Exception, Task> handler,
|
||
int retryCount = 5,
|
||
ushort prefetchCount = 1,
|
||
bool deadLetter = true,
|
||
string exchangeType = ExchangeType.Direct,
|
||
bool durable = true,
|
||
int consumerCount = 1,
|
||
IDictionary<string, object> queueArguments = null,
|
||
IDictionary<string, object> exchangeArguments = null,
|
||
EventHandler<ConsumerEventArgs> registered = null,
|
||
EventHandler<ConsumerEventArgs> unregistered = null,
|
||
EventHandler<ShutdownEventArgs> shutdown = null)
|
||
where T : class
|
||
{
|
||
//获取管道
|
||
var channel = GetChannel(queue);
|
||
|
||
//判断交换机是否存在
|
||
if (!IsExchangeExist(channel, exchange))
|
||
channel = ExchangeDeclare(channel, exchange, exchangeType, durable, arguments: exchangeArguments);
|
||
|
||
//判断队列是否存在
|
||
if (!IsQueueExist(channel, queue))
|
||
channel = QueueDeclare(channel, queue, durable, arguments: queueArguments);
|
||
|
||
//绑定交换机和队列
|
||
QueueBind(channel, exchange, queue, routingKey);
|
||
|
||
//设置每次预取数量
|
||
channel.BasicQos(0, prefetchCount, false);
|
||
|
||
//根据设置的消费者数量创建消费者
|
||
for (int i = 0; i < consumerCount; i++)
|
||
{
|
||
//创建消费者
|
||
var consumer = new EventingBasicConsumer(channel);
|
||
|
||
//接收消息事件
|
||
consumer.Received += async (sender, ea) =>
|
||
{
|
||
var text = Encoding.UTF8.GetString(ea.Body.ToArray());
|
||
var body = text;
|
||
var numberOfRetries = 0;
|
||
Exception exception = null;
|
||
bool? result = false;
|
||
|
||
while (numberOfRetries <= retryCount)
|
||
{
|
||
try
|
||
{
|
||
var msg = body.ToObject<T>();
|
||
|
||
if (subscriber != null)
|
||
result = await subscriber(msg, ea);
|
||
|
||
if (result == true)
|
||
channel.BasicAck(ea.DeliveryTag, false);
|
||
else
|
||
channel.BasicNack(ea.DeliveryTag, false, false);
|
||
|
||
//异常置空
|
||
exception = null;
|
||
break;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
exception = ex;
|
||
|
||
if (handler != null)
|
||
await handler(body, numberOfRetries, ex);
|
||
|
||
numberOfRetries++;
|
||
}
|
||
}
|
||
|
||
//重试后异常仍未解决
|
||
if (exception != null)
|
||
channel.BasicNack(ea.DeliveryTag, false, false);
|
||
|
||
//是否进入内置死信队列
|
||
if (deadLetter && (!(result == true) || exception != null))
|
||
PublishToDead(
|
||
queue,
|
||
ea.Exchange,
|
||
ea.RoutingKey,
|
||
body,
|
||
exception == null ? numberOfRetries : numberOfRetries - 1,
|
||
exception);
|
||
};
|
||
|
||
//注册事件
|
||
if (registered != null)
|
||
consumer.Registered += registered;
|
||
|
||
//取消注册事件
|
||
if (unregistered != null)
|
||
consumer.Unregistered += unregistered;
|
||
|
||
//关闭事件
|
||
if (shutdown != null)
|
||
consumer.Shutdown += shutdown;
|
||
|
||
//手动确认
|
||
channel.BasicConsume(queue, false, consumer);
|
||
}
|
||
}
|
||
|
||
#endregion 订阅消息
|
||
|
||
#region 获取消息
|
||
|
||
/// <summary>
|
||
/// 获取消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="handler">消费处理委托</param>
|
||
public void Pull<T>(
|
||
Action<T, BasicGetResult> handler)
|
||
where T : class
|
||
{
|
||
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
||
if (attribute == null)
|
||
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
||
|
||
Pull(attribute.Exchange, attribute.Queue, attribute.RoutingKey, handler);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="handler">消费处理委托</param>
|
||
public void Pull<T>(
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
Action<T, BasicGetResult> handler)
|
||
where T : class
|
||
{
|
||
//获取管道
|
||
var channel = GetChannel(queue);
|
||
|
||
//判断交换机是否存在
|
||
if (!IsExchangeExist(channel, exchange))
|
||
channel = ExchangeDeclare(channel, exchange);
|
||
|
||
//判断队列是否存在
|
||
if (!IsQueueExist(channel, queue))
|
||
channel = QueueDeclare(channel, queue);
|
||
|
||
//绑定交换机和队列
|
||
QueueBind(channel, exchange, queue, routingKey);
|
||
|
||
var result = channel.BasicGet(queue, false);
|
||
if (result == null)
|
||
return;
|
||
var text = Encoding.UTF8.GetString(result.Body.ToArray());
|
||
var msg = text.ToObject<T>();
|
||
try
|
||
{
|
||
handler(msg, result);
|
||
}
|
||
catch (Exception)
|
||
{
|
||
throw;
|
||
}
|
||
finally
|
||
{
|
||
channel.BasicAck(result.DeliveryTag, false);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="handler">消费处理委托</param>
|
||
public async Task PullAsync<T>(
|
||
Func<T, BasicGetResult, Task> handler)
|
||
where T : class
|
||
{
|
||
var attribute = typeof(T).GetAttribute<RabbitMqAttribute>();
|
||
if (attribute == null)
|
||
throw new ArgumentException("RabbitMqAttribute Is Null!");
|
||
|
||
await PullAsync(attribute.Exchange, attribute.Queue, attribute.RoutingKey, handler);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取消息
|
||
/// </summary>
|
||
/// <typeparam name="T"></typeparam>
|
||
/// <param name="exchange">交换机名称</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <param name="routingKey">路由键</param>
|
||
/// <param name="handler">消费处理委托</param>
|
||
public async Task PullAsync<T>(
|
||
string exchange,
|
||
string queue,
|
||
string routingKey,
|
||
Func<T, BasicGetResult, Task> handler)
|
||
where T : class
|
||
{
|
||
//获取管道
|
||
var channel = GetChannel(queue);
|
||
|
||
//判断交换机是否存在
|
||
if (!IsExchangeExist(channel, exchange))
|
||
channel = ExchangeDeclare(channel, exchange);
|
||
|
||
//判断队列是否存在
|
||
if (!IsQueueExist(channel, queue))
|
||
channel = QueueDeclare(channel, queue);
|
||
|
||
//绑定交换机和队列
|
||
QueueBind(channel, exchange, queue, routingKey);
|
||
|
||
var result = channel.BasicGet(queue, false);
|
||
if (result == null)
|
||
return;
|
||
var text = Encoding.UTF8.GetString(result.Body.ToArray());
|
||
var msg = text.ToObject<T>();
|
||
try
|
||
{
|
||
if (handler != null)
|
||
await handler(msg, result);
|
||
}
|
||
catch (Exception)
|
||
{
|
||
throw;
|
||
}
|
||
finally
|
||
{
|
||
channel.BasicAck(result.DeliveryTag, false);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取消息数量
|
||
/// </summary>
|
||
/// <param name="channel">管道</param>
|
||
/// <param name="queue">队列名称</param>
|
||
/// <returns></returns>
|
||
public uint GetMessageCount(
|
||
IModel channel,
|
||
string queue)
|
||
{
|
||
return (channel ?? GetChannel(queue)).MessageCount(queue);
|
||
}
|
||
|
||
#endregion 获取消息
|
||
|
||
#region 释放资源
|
||
|
||
/// <summary>
|
||
/// 执行与释放或重置非托管资源关联的应用程序定义的任务。
|
||
/// </summary>
|
||
public void Dispose()
|
||
{
|
||
foreach (var item in _channelDic)
|
||
{
|
||
item.Value?.Dispose();
|
||
}
|
||
|
||
foreach (var item in _queueHelperDic)
|
||
{
|
||
item.Value?.Dispose();
|
||
}
|
||
|
||
_connection?.Dispose();
|
||
_channelDic.Clear();
|
||
_queueHelperDic.Clear();
|
||
}
|
||
|
||
#endregion 释放资源
|
||
}
|
||
|
||
/// <summary>
|
||
/// RabbitMq连接配置
|
||
/// </summary>
|
||
public class MqConfig
|
||
{
|
||
/// <summary>
|
||
/// 是否启用
|
||
/// </summary>
|
||
public bool Enabled { get; set; }
|
||
|
||
/// <summary>
|
||
/// 主机名
|
||
/// </summary>
|
||
public string HostName { get; set; } = "localhost";
|
||
|
||
/// <summary>
|
||
/// 心跳时间
|
||
/// </summary>
|
||
public TimeSpan RequestedHeartbeat { get; set; } = TimeSpan.FromSeconds(10);
|
||
|
||
/// <summary>
|
||
/// 自动重连
|
||
/// </summary>
|
||
public bool AutomaticRecoveryEnabled { get; set; } = true;
|
||
|
||
/// <summary>
|
||
/// 重连时间
|
||
/// </summary>
|
||
public TimeSpan NetworkRecoveryInterval { get; set; } = TimeSpan.FromSeconds(10);
|
||
|
||
/// <summary>
|
||
/// 用户名
|
||
/// </summary>
|
||
public string UserName { get; set; } = "guest";
|
||
|
||
/// <summary>
|
||
/// 密码
|
||
/// </summary>
|
||
public string Password { get; set; } = "guest";
|
||
|
||
/// <summary>
|
||
/// 端口号
|
||
/// </summary>
|
||
public int Port { get; set; } = 5672;
|
||
|
||
/// <summary>
|
||
/// 虚拟主机
|
||
/// </summary>
|
||
public string VirtualHost { get; set; } = "/";
|
||
}
|
||
|
||
/// <summary>
|
||
/// 自定义的RabbitMq队列信息实体特性
|
||
/// </summary>
|
||
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
|
||
public class RabbitMqAttribute : Attribute
|
||
{
|
||
/// <summary>
|
||
/// 交换机
|
||
/// </summary>
|
||
public string Exchange { get; set; }
|
||
|
||
/// <summary>
|
||
/// 交换机类型(广播fanout,队列direct)
|
||
/// </summary>
|
||
public string ExchangeType { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列
|
||
/// </summary>
|
||
public string Queue { get; set; }
|
||
|
||
/// <summary>
|
||
/// 路由键
|
||
/// </summary>
|
||
public string RoutingKey { get; set; }
|
||
|
||
/// <summary>
|
||
/// 是否持久化,默认true
|
||
/// </summary>
|
||
public bool Durable { get; set; } = true;
|
||
|
||
/// <summary>
|
||
/// 预取数量,默认1
|
||
/// </summary>
|
||
public ushort PrefetchCount { get; set; } = 1;
|
||
|
||
/// <summary>
|
||
/// 异常重试次数,默认5次
|
||
/// </summary>
|
||
public int RetryCount { get; set; } = 5;
|
||
|
||
/// <summary>
|
||
/// 消息过期时间,过期后队列中消息自动被删除,单位ms
|
||
/// </summary>
|
||
public int MessageTTL { get; set; }
|
||
|
||
/// <summary>
|
||
/// 消息头部对象json字符串
|
||
/// </summary>
|
||
public string Header { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列消费者数量,默认1
|
||
/// </summary>
|
||
public int ConsumerCount { get; set; } = 1;
|
||
|
||
/// <summary>
|
||
/// 队列过期时间,过期后队列自动被删除,单位ms
|
||
/// </summary>
|
||
public int AutoExpire { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列最大长度
|
||
/// </summary>
|
||
public int MaxLength { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列占用的最大空间
|
||
/// </summary>
|
||
public int MaxLengthBytes { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列溢出行为,可选值:drop-head或者reject-publish,默认drop-head
|
||
/// </summary>
|
||
public string OverflowBehaviour { get; set; }
|
||
|
||
/// <summary>
|
||
/// 是否启用系统内置死信队列处理逻辑,默认true;注意:此处与原生的死信无关
|
||
/// </summary>
|
||
public bool DeadLetter { get; set; } = true;
|
||
|
||
/// <summary>
|
||
/// 死信交换机
|
||
/// </summary>
|
||
public string DeadLetterExchange { get; set; }
|
||
|
||
/// <summary>
|
||
/// 死信路由键
|
||
/// </summary>
|
||
public string DeadLetterRoutingKey { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列最大优先级,数值越大优先级越高,范围1-255,建议取值1-10
|
||
/// </summary>
|
||
public int MaximumPriority { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列惰性模式,可选值:default或者lazy
|
||
/// </summary>
|
||
public string LazyMode { get; set; }
|
||
|
||
/// <summary>
|
||
/// 主定位器,集群配置
|
||
/// </summary>
|
||
public string MasterLocator { get; set; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// 内置死信队列实体
|
||
/// </summary>
|
||
public class DeadLetterQueue
|
||
{
|
||
/// <summary>
|
||
/// 消息内容
|
||
/// </summary>
|
||
public string Body { get; set; }
|
||
|
||
/// <summary>
|
||
/// 交换机
|
||
/// </summary>
|
||
public string Exchange { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列
|
||
/// </summary>
|
||
public string Queue { get; set; }
|
||
|
||
/// <summary>
|
||
/// 路由键
|
||
/// </summary>
|
||
public string RoutingKey { get; set; }
|
||
|
||
/// <summary>
|
||
/// 重试次数
|
||
/// </summary>
|
||
public int RetryCount { get; set; }
|
||
|
||
/// <summary>
|
||
/// 异常消息
|
||
/// </summary>
|
||
public string ExceptionMsg { get; set; }
|
||
|
||
/// <summary>
|
||
/// 异常
|
||
/// </summary>
|
||
public Exception Exception { get; set; }
|
||
|
||
/// <summary>
|
||
/// 创建时间
|
||
/// </summary>
|
||
public DateTime CreateDateTime { get; set; } = DateTime.Now;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 内存队列消息
|
||
/// </summary>
|
||
public class QueueMessage
|
||
{
|
||
/// <summary>
|
||
/// 交换机名称
|
||
/// </summary>
|
||
public string Exchange { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列名称
|
||
/// </summary>
|
||
public string Queue { get; set; }
|
||
|
||
/// <summary>
|
||
/// 路由键
|
||
/// </summary>
|
||
public string RoutingKey { get; set; }
|
||
|
||
/// <summary>
|
||
/// 消息内容
|
||
/// </summary>
|
||
public IEnumerable<string> Body { get; set; }
|
||
|
||
/// <summary>
|
||
/// 交换机类型
|
||
/// </summary>
|
||
public string ExchangeType { get; set; }
|
||
|
||
/// <summary>
|
||
/// 持久化
|
||
/// </summary>
|
||
public bool Durable { get; set; }
|
||
|
||
/// <summary>
|
||
/// 消息发送确认
|
||
/// </summary>
|
||
public bool Confirm { get; set; }
|
||
|
||
/// <summary>
|
||
/// 单个消息过期时间,单位ms
|
||
/// </summary>
|
||
public string Expiration { get; set; }
|
||
|
||
/// <summary>
|
||
/// 单个消息优先级,数值越大优先级越高,取值范围:0-9
|
||
/// </summary>
|
||
public byte? Priority { get; set; }
|
||
|
||
/// <summary>
|
||
/// 队列参数
|
||
/// </summary>
|
||
public IDictionary<string, object> QueueArguments { get; set; }
|
||
|
||
/// <summary>
|
||
/// 交换机参数
|
||
/// </summary>
|
||
public IDictionary<string, object> ExchangeArguments { get; set; }
|
||
|
||
/// <summary>
|
||
/// 消息头部
|
||
/// </summary>
|
||
public IDictionary<string, object> Headers { get; set; }
|
||
}
|
||
} |