Skip to content
Go back

在.NET中用Redis Pub/Sub实现简单高效的消息通信

Published:  at  12:00 AM

在.NET中用Redis Pub/Sub实现简单高效的消息通信 🚀

引言:Redis,不只是缓存!

当你在做.NET后端开发时,Redis可能早已是你缓存优化的常用工具。不过,Redis的能力远不止于此!它还内置了一个被低估的利器——Pub/Sub(发布/订阅)机制。通过Redis Channels,你可以轻松地在不同服务间实现实时消息推送,打造高效的分布式通信方案。今天,我们就来深入探讨如何在.NET项目中玩转Redis Pub/Sub,助你解决如缓存同步、实时通知等实际痛点。

Redis Pub/Sub=“轻量消息中枢”,让你的分布式系统更灵活、更有弹性!


1. Redis Channels是什么?——消息通信的高速公路 🛣️

Redis Channels是基于发布/订阅模式(Pub/Sub)的命名通信通道,每个通道都有唯一名字,比如notificationsupdates等。
发布者(Producer) 通过PUBLISH指令往通道发送消息;订阅者(Consumer) 通过SUBSCRIBE指令监听并消费消息。

结构示意图

Redis channel with publisher and three subscribers.

如图,一个Channel可以有多个发布者和多个订阅者,实现“一发多收”,非常适合广播类场景。

注意事项:


2. 哪些场景适合用Redis Pub/Sub?🤔

Redis Channels并不适合“丢消息就炸锅”的核心业务,但对于偶尔丢失消息可接受、追求实时性的场景,它是优雅且高效的选择:

⚠️ 如果你的业务对消息可靠性极高,请考虑Kafka、RabbitMQ等更专业的消息队列系统。


3. .NET项目实战:用StackExchange.Redis实现Pub/Sub 💻

让我们直接上手,用.NET主流库StackExchange.Redis实现一个最简的生产者+消费者模型。

1)安装依赖

Install-Package StackExchange.Redis

如果本地没装Redis,可以用Docker一键启动:

docker run -it -p 6379:6379 redis

2)实现Producer(消息生产者)

public class Producer(ILogger<Producer> logger) : BackgroundService
{
    private static readonly string ConnectionString = "localhost:6379";
    private static readonly ConnectionMultiplexer Connection =
        ConnectionMultiplexer.Connect(ConnectionString);

    private const string Channel = "messages";

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var subscriber = Connection.GetSubscriber();

        while (!stoppingToken.IsCancellationRequested)
        {
            var message = new Message(Guid.NewGuid(), DateTime.UtcNow);
            var json = JsonSerializer.Serialize(message);

            await subscriber.PublishAsync(Channel, json);

            logger.LogInformation(
                "Sending message: {Channel} - {@Message}", message);

            await Task.Delay(5000, stoppingToken);
        }
    }
}

3)实现Consumer(消息消费者)

public class Consumer(ILogger<Consumer> logger) : BackgroundService
{
    private static readonly string ConnectionString = "localhost:6379";
    private static readonly ConnectionMultiplexer Connection =
        ConnectionMultiplexer.Connect(ConnectionString);

    private const string Channel = "messages";

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var subscriber = Connection.GetSubscriber();

        await subscriber.SubscribeAsync(Channel, (channel, message) =>
        {
            var msg = JsonSerializer.Deserialize<Message>(message);

            logger.LogInformation(
                "Received message: {Channel} - {@Message}",
                channel,
                msg);
        });
    }
}

4)运行效果图

Pub/Sub demo.


4. 实践案例:用Pub/Sub做分布式缓存失效通知 🧹

在大型分布式系统中,常见的缓存策略是“本地内存缓存+全局Redis缓存”。
但数据库数据变更时,如何及时同步所有节点的本地缓存?

方案:用Redis Pub/Sub做缓存失效广播!

每个应用实例都运行一个后台服务,监听cache-invalidation通道:

public class CacheInvalidationBackgroundService(
    IServiceProvider serviceProvider)
    : BackgroundService
{
    public const string Channel = "cache-invalidation";

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await subscriber.SubscribeAsync(Channel, (channel, key) =>
        {
            var cache = serviceProvider.GetRequiredService<IMemoryCache>();
            cache.Remove(key);
            return Task.CompletedTask;
        });
    }
}

当数据库发生变更时,只需publish对应key到该Channel,各节点即刻同步清理内存缓存。
即使个别节点短暂掉线而未收到通知也无大碍,因为重启后缓存本就会失效。这种方式既简单又高效,极大提升了数据一致性体验!


5. 总结与延伸 📚

未来,如果你想进一步研究更复杂的分布式架构和消息总线设计,强烈推荐了解Modular Monolith Architecture等先进架构理念!


你的看法和实践?

你是否已经在生产环境用过Redis Pub/Sub?遇到哪些坑?
欢迎在评论区留言交流 👇 或把本文转发给同样关注.NET与分布式系统的朋友们!

如果你喜欢这样的技术深度分享,也别忘了点赞关注哦~ 🔥



Previous Post
HTTP/2 vs HTTP/3全方位技术对比详解 🚀
Next Post
AlphaEvolve:Gemini大模型驱动的进化式代码智能体,开启算法自动发现新纪元