C#通道实战指南:有界VS无界,高并发场景下的正确选择

作者:微信公众号:【架构师老卢】
8-7 8:23
15

构建可靠、可扩展和高性能的 .NET 应用程序,关键在于如何处理并发和数据处理。C# 通道 (Channels) 提供了一种新颖、现代的方法,用于在 .NET 中构建安全、异步和高吞吐量的管道。

通道允许你创建内存中的生产者-消费者队列,这些队列可以自然地跨异步工作流和后台服务进行扩展。然而,一个关键的架构决策是在有界 (bounded) 和无界 (unbounded) 通道之间做出选择。

在今天的文章中,我们将探讨:

  • C# 通道是什么?
  • 有界通道 VS 无界通道
  • 使用通道进行后台处理
  • 在 ASP.NET Core 实际应用中使用通道
  • 使用通道的最佳实践和技巧

让我们开始吧!

C# 通道是什么? 在构建 .NET 应用时,你经常需要将数据从代码的一部分发送到另一部分。

过去,开发人员为此目的使用诸如 Queue<T>ConcurrentQueue<T>BlockingCollection<T> 等结构。他们将队列封装到类中,并用它们来管理数据流。

然而,这样的实现有一个显著的缺点:紧密的代码耦合。

C# 通道解决了这个问题。它们实现了生产者-消费者模式。一个类生产数据,另一个类消费数据,彼此无需知晓对方。

C# 通道来自 System.Threading.Channels 命名空间。通道使得在生产者与消费者之间发送数据变得简单,帮助你避免常见的线程问题。

一个通道有两个部分:

  • 写入器 (Writer):将数据推入通道。
  • 读取器 (Reader):从通道拉取数据。

读取和写入都可以在不同的线程上发生,通道确保线程安全。它们让你可以在任何地方使用异步代码,因此你的应用可以处理大量数据而不会阻塞线程或加锁。

下面是一个在 C# 中使用通道的基本示例。我们异步地生产数字并从通道中消费它们:

using System;
using System.Threading.Channels;
using System.Threading.Tasks;

var channel = Channel.CreateUnbounded<int>();

// 生产者
_ = Task.Run(async () =>
{
    for (var i = 0; i < 10; i++)
    {
        await channel.Writer.WriteAsync(i);
        Console.WriteLine($"Produced: {i}");
        await Task.Delay(100); // 模拟工作耗时
    }
    channel.Writer.Complete();
});

// 消费者
await foreach (var item in channel.Reader.ReadAllAsync())
{
    Console.WriteLine($"Consumed: {item}");
    await Task.Delay(150); // 模拟处理耗时
}

Console.WriteLine("Processing complete.");

其工作原理如下:

  1. 生产者在一个单独的线程(任务)上运行,向通道写入数据。
  2. 消费者从通道读取数据并进行处理。
  3. 通道处理所有的通信和线程安全。

通道非常适合以下场景:

  • 你需要使用 async/await 连接生产者和消费者。
  • 你想要解耦生产和消费逻辑。
  • 你想要控制数据流(例如,如果消费者跟不上速度,则减慢生产者)。
  • 你想要避免底层的线程处理、加锁或手动同步。

通道是流式传输事件和处理后台任务的理想选择。它们是在单个应用程序内部消息队列的一种简单内存替代方案。

有界通道 VS 无界通道 C# 通道有两种类型:有界 (bounded) 和无界 (unbounded)。两者都允许你将数据从生产者传递到消费者,但它们在处理流控制和内存方面有所不同。

什么是有界通道? 有界通道具有固定的最大容量。创建它时,你需要设置它能同时容纳的项目数量上限。

如果生产者在通道已满后尝试添加更多项目,它必须等待直到有空位。

何时应使用有界通道?

  • 当你想限制内存使用并防止过载时。
  • 当消费者有时比生产者慢时。
  • 当你需要反压 (backpressure) 以避免系统过载时。
var channel = Channel.CreateBounded<int>(5);

// 生产者
await channel.Writer.WriteAsync(1); // 如果有空间则添加项目,如果已满则等待

// 消费者
var item = await channel.Reader.ReadAsync(); // 取出一个项目

有界通道是后台处理、作业队列以及任何需要控制资源使用的应用程序的良好选择。它们有助于保护你的应用免受突发流量激增和生产者失控的影响。

什么是无界通道? 无界通道没有固定限制。生产者可以随心所欲地快速添加项目。

通道会增长以处理你放入的任意多项目,仅受可用系统内存的限制。

何时应使用无界通道?

  • 当你确信生产者永远不会长时间超过消费者的处理速度时。
  • 在流控制不成问题的简单情况下。
  • 当你预期项目流较小或稳定时。

以下是创建无界通道的方法:

var channel = Channel.CreateUnbounded<int>();

// 生产者
await channel.Writer.WriteAsync(42); // 总是接受新项目

// 消费者
var item = await channel.Reader.ReadAsync(); // 取出一个项目

无界通道使用简单,但在高负载情况下可能有风险。如果生产者写入项目的速度超过消费者读取的速度,你可能会耗尽内存。

你应该使用哪种?

  • 如果你追求系统的安全性和稳定性,请从有界通道开始。它们在消费者落后时保护你的应用。
  • 如果你知道生产者始终可控且数据速率较低,则可以使用无界通道。

在大多数现实世界的 .NET 服务中,有界通道是更安全的默认选择。

使用有界通道的后台处理器 在生产环境中,通道通常用于 ASP.NET Core 的后台服务 (BackgroundService)。

一种常见模式是设置一个后台服务,该服务从通道读取消息或任务并逐个处理它们。这使得你的代码易于扩展,并释放主线程用于其他工作。

让我们探索一个处理来自通道项目的 BackgroundService 示例:

builder.Services.AddSingleton(_ => Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
    FullMode = BoundedChannelFullMode.Wait
}));

public class MessageProcessor : BackgroundService
{
    private readonly Channel<string> _channel;
    private readonly ILogger<MessageProcessor> _logger;

    public MessageProcessor(Channel<string> channel, ILogger<MessageProcessor> logger)
    {
       _channel = channel;
       _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
       _logger.LogInformation("消息处理器启动");

       await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
       {
          _logger.LogInformation("正在处理消息: {Message}", message);

          await Task.Delay(100, stoppingToken); // 模拟处理耗时

          _logger.LogInformation("消息处理成功: {Message}", message);
       }
    }
}

要将项目发布到通道中,你需要调用 WriteAsync 方法:

private Task AddSampleMessagesAsync(CancellationToken stoppingToken)
{
    _ = Task.Run(async () =>
    {
        // 添加 150 条消息(超过通道容量 100)
        for (var i = 1; i <= 150 && !stoppingToken.IsCancellationRequested; i++)
        {
            var message = $"示例消息 #{i}";

            // 等待直到通道中有空间(当通道满时这会阻塞)
            await _channel.Writer.WriteAsync(message, stoppingToken);

            _logger.LogInformation("已将消息添加到通道: {Message}", message);

            await Task.Delay(50, stoppingToken); // 模拟消息生成间隔
        }

        // 添加消息完成后,标记通道完成
        _channel.Writer.Complete();
    }, stoppingToken);
    return Task.CompletedTask;
}

注意,我们在 AsyncEnumerable 上运行了 foreach 循环:

await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
{
    // 处理消息
}

_channel.Reader.ReadAllAsync 等待新消息出现在通道中。当发布消息完成后,你可以调用 _channel.Writer.Complete(),循环将结束。

该通道的容量上限为 100,因此如果有 100 条消息已经在等待,生产者将暂停,直到有空闲槽位可用。后台服务会尽可能快地读取并处理消息。

如果你尝试过快地添加消息,生产者将减速,这有助于控制内存使用。

创建有界通道时,你可以设置 BoundedChannelFullMode 来控制通道满时发生的情况:

  • Wait:写入器等待直到有可用空间(最常见,最安全)。
  • DropWrite:如果通道已满,则丢弃新项目。
  • DropOldest:移除最旧的项目以腾出空间给新项目。
  • DropNewest:丢弃最新的项目。

对于大多数后台任务,使用 Wait。当丢失少量消息是可以接受的时,DropWriteDropOldest 可以是合适的选择。当最新事件比旧事件更相关时,可以使用 DropOldest

通道的实际应用 在真实的 ASP.NET Core 应用程序中,你可以使用通道来实现写回缓存策略 (Write Back Caching Strategy)

这种缓存策略用于高速、写入密集的场景。

主要思想是数据首先写入缓存。缓存随后在满足特定条件或间隔后,异步地将数据写回数据库。

让我们探索一个真实示例:一个在线商店应用程序。

用户经常向他们的在线购物车添加或移除商品。最终状态仅在结账时才真正关键。写入可以非常快,因为它们首先命中缓存,系统可以定期将更新刷新到数据库。

这使得在购物高峰期能够实现高写入吞吐量,而数据库最终会获得最终的购物车详情。

以下是创建产品购物车的 WebApi 端点:

public record ProductCartRequest(string UserId, List<ProductCartItemRequest> ProductCartItems);

public record ProductCartItemRequest(Guid ProductId, int Quantity);

[HttpPost]
public async Task<ActionResult<ProductCartResponse>> CreateCart(ProductCartRequest request)
{
    var response = await _service.AddAsync(request);
    return CreatedAtAction(nameof(GetCart), new { id = response.Id }, response);
}

当创建一个新的 ProductCart 时,它仅被立即添加到缓存中:

public class WriteBackCacheProductCartService
{
    private readonly HybridCache _cache;
    private readonly IProductCartRepository _repository;
    private readonly Channel<ProductCartDispatchEvent> _channel;

    public WriteBackCacheProductCartService(
        HybridCache cache,
        IProductCartRepository repository,
        Channel<ProductCartDispatchEvent> channel)
    {
        _cache = cache;
        _repository = repository;
        _channel = channel;
    }

    public async Task<ProductCartResponse> AddAsync(ProductCartRequest request)
    {
        var productCart = new ProductCart
        {
            Id = Guid.NewGuid(),
            UserId = request.UserId,
            CartItems = request.ProductCartItems.Select(x => new CartItem
            {
                Id = Guid.NewGuid(),
                Quantity = x.Quantity,
                Price = Random.Shared.Next(100, 1000) // 模拟价格
            }).ToList()
        };

        var cacheKey = $"productCart:{productCart.Id}";

        var productCartResponse = MapToProductCartResponse(productCart); // 假设的映射方法
        await _cache.SetAsync(cacheKey, productCartResponse); // 写入缓存

        // 发布事件到通道,触发后台异步写回数据库
        await _channel.Writer.WriteAsync(new ProductCartDispatchEvent(productCart));

        return productCartResponse;
    }
}

这里我们使用一个有界通道来发布 ProductCartDispatchEvent

public record ProductCartDispatchEvent(ProductCart ProductCart);

builder.Services.AddSingleton(_ => Channel.CreateBounded<ProductCartDispatchEvent>(new BoundedChannelOptions(100)
{
    FullMode = BoundedChannelFullMode.Wait // 通道满时等待
}));

一个后台服务从通道读取数据,并异步地将购物车数据写入数据库:

public class WriteBackCacheBackgroundService(IServiceScopeFactory scopeFactory,
    Channel<ProductCartDispatchEvent> channel) : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var command in channel.Reader.ReadAllAsync(stoppingToken))
        {
            // 使用作用域获取作用域内服务
            using var scope = scopeFactory.CreateScope();
            var repository = scope.ServiceProvider.GetRequiredService<IProductCartRepository>();

            var existingCart = await repository.GetByIdAsync(command.ProductCart.Id);
            if (existingCart is null)
            {
                await repository.AddAsync(command.ProductCart); // 新增
                return;
            }

            existingCart.CartItems = command.ProductCart.CartItems; // 更新现有
            await repository.UpdateAsync(existingCart);
        }
    }
}

注意:这种模式可以显著提高写入速度,但它需要健壮的冲突解决和故障处理机制来确保一致性。要确保在缓存故障时购物车状态永不丢失,需要健壮的复制或备份机制。

使用通道的最佳实践和技巧 通道是一个强大的工具,但要充分利用它们(并避免问题),遵循一些关键的最佳实践非常重要。以下是一些在 .NET 应用程序中使用通道的实用技巧。

  1. 优先选择有界通道以保证安全: 大多数时候,你应该从有界通道开始。有界通道设置了等待处理数据量的上限。这使你的应用程序在重负载下更加稳定。 根据你的工作负载将容量设置为合理的数值。如果你预期数据会激增,选择一个能覆盖典型突发但又不至于过大的值。 无界通道仅在你知道数据速率始终很低的情况下才是安全的。

  2. 记得标记通道完成: 当你的生产者完成写入时,在通道上调用 .Writer.Complete()。这会让消费者知道不会有更多数据了,允许它结束。 如果你不标记通道完成,你的消费者的 ReadAllAsync 循环将永远不会结束。

  3. 在写入和读取操作中使用 await 通道是为异步代码设计的。在写入或读取时始终使用 await,以避免阻塞线程并保持应用的响应性。

    await Channel.Writer.WriteAsync(data, cancellationToken);
    await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
    {
        // 处理项目
    }
    
  4. 正确处理取消: 在读取或写入通道时始终传递 CancellationToken。这允许你在应用关闭或用户取消操作时停止处理。

    await channel.Writer.WriteAsync(message, cancellationToken);
    
  5. 不要在过多的生产者或消费者之间共享通道: 虽然通道可以处理多个写入器和读取器,但过多会使调试变得困难。在大多数情况下,为了获得最佳性能和最容易推理,坚持使用单一生产者和单一消费者。 如果你需要更多,.NET 通道支持多个读取器和写入器,但你应该精心设计以避免意外情况。

  6. 监控你的通道使用情况: 在生产环境中监视你的通道容量。如果你经常看到通道被填满(生产者等待写入),这可能意味着你的消费者太慢了。你可能需要加快处理速度或增加通道大小。

  7. 选择合适的 FullMode(对于有界通道): 创建有界通道时,你可以设置 BoundedChannelFullMode 来控制通道满时发生的情况(Wait, DropWrite, DropOldest, DropNewest)。根据你的应用需求和对数据丢失的容忍度仔细选择。

相关留言评论
昵称:
邮箱:
阅读排行