使用.NET Channels构建轻量级内存中消息总线
摘要
假设你正在构建一个模块化单体软件,这是一种软件架构,不同的组件被组织成松散耦合的模块。或者你可能需要异步处理数据。你将需要一个工具或服务来实现这一点。
假设你正在构建一个模块化单体软件,这是一种软件架构,不同的组件被组织成松散耦合的模块。或者你可能需要异步处理数据。你将需要一个工具或服务来实现这一点。
在现代软件架构中,消息传递扮演着关键角色,使松散耦合的组件能够进行通信和协调。
内存中消息总线在高性能和低延迟是关键要求时特别有用。
在今天的问题中,我们将:
- 创建所需的消息抽象
- 使用channels构建一个内存中消息总线
- 实现一个集成事件处理器后台作业
- 演示如何异步发布和消费消息
何时使用内存中消息总线
我必须首先说明,内存中消息总线远非万能药。正如你即将了解到的,使用它有很多注意事项。
但首先,让我们从使用内存中消息总线的优点开始:
- 因为它在内存中工作,你可以拥有一个非常低延迟的消息系统
- 你可以实现组件之间的异步(非阻塞)通信
然而,这种方法有一些缺点:
- 如果应用程序进程崩溃,可能会丢失消息
- 它只在单个进程内工作,因此在分布式系统中不实用
内存中消息总线的一个实际用例是在构建模块化单体软件时。你可以使用集成事件实现模块之间的通信。当你需要将一些模块提取到一个单独的服务时,你可以用一个分布式的替换内存中的总线。
定义消息抽象
我们需要一些抽象来构建我们简单的消息系统。从客户端的角度来看,我们真正需要的只有两件事。一个是发布消息的抽象,另一个是定义消息处理器。
IEventBus
接口暴露了 PublishAsync
方法。这是我们用来发布消息的方法。还定义了一个泛型约束,只允许传入一个 IIntegrationEvent
实例。
public interface IEventBus
{
Task PublishAsync<T>(
T integrationEvent,
CancellationToken cancellationToken = default)
where T : class, IIntegrationEvent;
}
我想用实际的 IIntegrationEvent
抽象,所以我将使用 MediatR 来支持 pub-sub。IIntegrationEvent
接口将继承自 INotification
。这使我们能够使用 INotificationHandler<T>
轻松定义 IIntegrationEvent
处理器。此外,IIntegrationEvent
有一个标识符,所以我们可以跟踪它的执行。
抽象的 IntegrationEvent
作为具体实现的基类。
using MediatR;
public interface IIntegrationEvent : INotification
{
Guid Id { get; init; }
}
public abstract record IntegrationEvent(Guid Id) : IIntegrationEvent;
使用Channels的简单内存队列
System.Threading.Channels
命名空间提供了数据结构,用于异步在生产者和消费者之间传递消息。Channels实现了生产者/消费者模式。生产者异步产生数据,消费者异步消费这些数据。这是构建松散耦合系统的基本模式之一。
采用 .NET Channels 的主要动机之一是它们出色的性能特征。与传统消息队列不同,Channels 完全在内存中操作。这有一个缺点,即如果应用崩溃,可能会导致消息丢失。
InMemoryMessageQueue
使用 Channel.CreateUnbounded
创建了一个无界的通道。这意味着通道可以有任意数量的读写者。它还暴露了一个 ChannelReader
和 ChannelWriter
,允许消费者发布和消费消息。
internal sealed class InMemoryMessageQueue
{
private readonly Channel<IIntegrationEvent> _channel =
Channel.CreateUnbounded<IIntegrationEvent>();
public ChannelReader<IIntegrationEvent> Reader => _channel.Reader;
public ChannelWriter<IIntegrationEvent> Writer => _channel.Writer;
}
你还需要将 InMemoryMessageQueue
作为单例注册到依赖注入中:
builder.Services.AddSingleton<InMemoryMessageQueue>();
实现EventBus
现在,利用channels,IEventBus
的实现就很简单了。EventBus
类使用 InMemoryMessageQueue
来访问 ChannelWriter
并将事件写入通道。
internal sealed class EventBus(InMemoryMessageQueue queue) : IEventBus
{
public async Task PublishAsync<T>(
T integrationEvent,
CancellationToken cancellationToken = default)
where T : class, IIntegrationEvent
{
await queue.Writer.WriteAsync(integrationEvent, cancellationToken);
}
}
我们将以无状态的方式注册 EventBus
为单例服务,因为它无状态:
builder.Services.AddSingleton<IEventBus, EventBus>();
消费集成事件
有了实现生产者的 EventBus
,我们需要一种方式来消费发布的 IIntegrationEvent
。我们可以使用内置的 IHostedService
抽象实现一个简单的后台服务。
IntegrationEventProcessorJob
依赖于 InMemoryMessageQueue
,但这次是为了读取(消费)消息。我们将使用 ChannelReader.ReadAllAsync
方法来获取一个 IAsyncEnumerable
。这允许我们异步消费 Channel
中的所有消息。
IPublisher
来自 MediatR,帮助我们将 IIntegrationEvent
与相应的处理器连接起来。如果你想向事件处理程序注入作用域服务,重要的是要从自定义作用域解析它。
internal sealed class IntegrationEventProcessorJob(
InMemoryMessageQueue queue,
IServiceScopeFactory serviceScopeFactory,
ILogger<IntegrationEventProcessorJob> logger)
: BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (IIntegrationEvent integrationEvent in
queue.Reader.ReadAllAsync(stoppingToken))
{
try
{
using IServiceScope scope = serviceScopeFactory.CreateScope();
IPublisher publisher = scope.ServiceProvider
.GetRequiredService<IPublisher>();
await publisher.Publish(integrationEvent, stoppingToken);
}
catch (Exception ex)
{
logger.LogError(
ex,
"Something went wrong! {IntegrationEventId}",
integrationEvent.Id);
}
}
}
}
别忘了注册托管服务:
builder.Services.AddHostedService<IntegrationEventProcessorJob>();
使用内存中消息总线
有了所有必要的抽象,我们最终可以使用内存中消息总线了。
IEventBus
服务将消息写入 Channel
并立即返回。这允许你以非阻塞方式发布消息,这可以提高性能。
internal sealed class RegisterUserCommandHandler(
IUserRepository userRepository,
IEventBus eventBus)
: ICommandHandler<RegisterUserCommand>
{
public async Task<User> Handle(
RegisterUserCommand command,
CancellationToken cancellationToken)
{
// 首先,注册用户。
User user = CreateFromCommand(command);
userRepository.Insert(user);
// 现在我们可以发布事件了。
await eventBus.PublishAsync(
new UserRegisteredIntegrationEvent(user.Id),
cancellationToken);
return user;
}
}
这解决了生产者方面,但我们还需要为 UserRegisteredIntegrationEvent
消息创建一个消费者。因为我在这个实现中使用了 MediatR,这部分被极大地简化了。
我们需要为处理集成事件 UserRegisteredIntegrationEvent
定义一个 INotificationHandler
实现。这将是 UserRegisteredIntegrationEventHandler
。
当后台作业从 Channel
读取 UserRegisteredIntegrationEvent
时,它将发布消息并执行处理程序。
internal sealed class UserRegisteredIntegrationEventHandler
: INotificationHandler<UserRegisteredIntegrationEvent>
{
public async Task Handle(
UserRegisteredIntegrationEvent event,
CancellationToken cancellationToken)
{
// 异步处理事件。
}
}
改进点
虽然我们的基本内存中消息总线是可行的,但有几个领域我们可以改进:
- 弹性 - 当我们遇到异常时,我们可以引入重试,这将提高消息总线的可靠性。
- 幂等性 - 问问自己是否想要处理相同的消息两次。幂等消费者模式 优雅地解决了这个问题。
- 死信队列 - 有时,我们可能无法正确处理消息。引入一个这些消息的持久存储是个好主意。这被称为死信队列,它允许我们在以后进行故障排查。
我们已经讨论了使用 .NET Channels 构建内存中消息总线的关键方面。你可以通过实施改进来进一步扩展这一点,以获得更健壮的解决方案。
记住,这个实现只在一个进程内工作。如果你需要更可靠的解决方案,请考虑使用真正的消息代理。