构建可靠、可扩展和高性能的 .NET 应用程序,关键在于如何处理并发和数据处理。C# 通道 (Channels) 提供了一种新颖、现代的方法,用于在 .NET 中构建安全、异步和高吞吐量的管道。
通道允许你创建内存中的生产者-消费者队列,这些队列可以自然地跨异步工作流和后台服务进行扩展。然而,一个关键的架构决策是在有界 (bounded) 和无界 (unbounded) 通道之间做出选择。
在今天的文章中,我们将探讨:
让我们开始吧!
C# 通道是什么? 在构建 .NET 应用时,你经常需要将数据从代码的一部分发送到另一部分。
过去,开发人员为此目的使用诸如 Queue<T>
、ConcurrentQueue<T>
或 BlockingCollection<T>
等结构。他们将队列封装到类中,并用它们来管理数据流。
然而,这样的实现有一个显著的缺点:紧密的代码耦合。
C# 通道解决了这个问题。它们实现了生产者-消费者模式。一个类生产数据,另一个类消费数据,彼此无需知晓对方。
C# 通道来自 System.Threading.Channels
命名空间。通道使得在生产者与消费者之间发送数据变得简单,帮助你避免常见的线程问题。
一个通道有两个部分:
读取和写入都可以在不同的线程上发生,通道确保线程安全。它们让你可以在任何地方使用异步代码,因此你的应用可以处理大量数据而不会阻塞线程或加锁。
下面是一个在 C# 中使用通道的基本示例。我们异步地生产数字并从通道中消费它们:
using System;
using System.Threading.Channels;
using System.Threading.Tasks;
var channel = Channel.CreateUnbounded<int>();
// 生产者
_ = Task.Run(async () =>
{
for (var i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Produced: {i}");
await Task.Delay(100); // 模拟工作耗时
}
channel.Writer.Complete();
});
// 消费者
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumed: {item}");
await Task.Delay(150); // 模拟处理耗时
}
Console.WriteLine("Processing complete.");
其工作原理如下:
通道非常适合以下场景:
async/await
连接生产者和消费者。通道是流式传输事件和处理后台任务的理想选择。它们是在单个应用程序内部消息队列的一种简单内存替代方案。
有界通道 VS 无界通道 C# 通道有两种类型:有界 (bounded) 和无界 (unbounded)。两者都允许你将数据从生产者传递到消费者,但它们在处理流控制和内存方面有所不同。
什么是有界通道? 有界通道具有固定的最大容量。创建它时,你需要设置它能同时容纳的项目数量上限。
如果生产者在通道已满后尝试添加更多项目,它必须等待直到有空位。
何时应使用有界通道?
var channel = Channel.CreateBounded<int>(5);
// 生产者
await channel.Writer.WriteAsync(1); // 如果有空间则添加项目,如果已满则等待
// 消费者
var item = await channel.Reader.ReadAsync(); // 取出一个项目
有界通道是后台处理、作业队列以及任何需要控制资源使用的应用程序的良好选择。它们有助于保护你的应用免受突发流量激增和生产者失控的影响。
什么是无界通道? 无界通道没有固定限制。生产者可以随心所欲地快速添加项目。
通道会增长以处理你放入的任意多项目,仅受可用系统内存的限制。
何时应使用无界通道?
以下是创建无界通道的方法:
var channel = Channel.CreateUnbounded<int>();
// 生产者
await channel.Writer.WriteAsync(42); // 总是接受新项目
// 消费者
var item = await channel.Reader.ReadAsync(); // 取出一个项目
无界通道使用简单,但在高负载情况下可能有风险。如果生产者写入项目的速度超过消费者读取的速度,你可能会耗尽内存。
你应该使用哪种?
在大多数现实世界的 .NET 服务中,有界通道是更安全的默认选择。
使用有界通道的后台处理器 在生产环境中,通道通常用于 ASP.NET Core 的后台服务 (BackgroundService)。
一种常见模式是设置一个后台服务,该服务从通道读取消息或任务并逐个处理它们。这使得你的代码易于扩展,并释放主线程用于其他工作。
让我们探索一个处理来自通道项目的 BackgroundService
示例:
builder.Services.AddSingleton(_ => Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
}));
public class MessageProcessor : BackgroundService
{
private readonly Channel<string> _channel;
private readonly ILogger<MessageProcessor> _logger;
public MessageProcessor(Channel<string> channel, ILogger<MessageProcessor> logger)
{
_channel = channel;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("消息处理器启动");
await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
{
_logger.LogInformation("正在处理消息: {Message}", message);
await Task.Delay(100, stoppingToken); // 模拟处理耗时
_logger.LogInformation("消息处理成功: {Message}", message);
}
}
}
要将项目发布到通道中,你需要调用 WriteAsync
方法:
private Task AddSampleMessagesAsync(CancellationToken stoppingToken)
{
_ = Task.Run(async () =>
{
// 添加 150 条消息(超过通道容量 100)
for (var i = 1; i <= 150 && !stoppingToken.IsCancellationRequested; i++)
{
var message = $"示例消息 #{i}";
// 等待直到通道中有空间(当通道满时这会阻塞)
await _channel.Writer.WriteAsync(message, stoppingToken);
_logger.LogInformation("已将消息添加到通道: {Message}", message);
await Task.Delay(50, stoppingToken); // 模拟消息生成间隔
}
// 添加消息完成后,标记通道完成
_channel.Writer.Complete();
}, stoppingToken);
return Task.CompletedTask;
}
注意,我们在 AsyncEnumerable
上运行了 foreach
循环:
await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
{
// 处理消息
}
_channel.Reader.ReadAllAsync
等待新消息出现在通道中。当发布消息完成后,你可以调用 _channel.Writer.Complete()
,循环将结束。
该通道的容量上限为 100,因此如果有 100 条消息已经在等待,生产者将暂停,直到有空闲槽位可用。后台服务会尽可能快地读取并处理消息。
如果你尝试过快地添加消息,生产者将减速,这有助于控制内存使用。
创建有界通道时,你可以设置 BoundedChannelFullMode
来控制通道满时发生的情况:
Wait
:写入器等待直到有可用空间(最常见,最安全)。DropWrite
:如果通道已满,则丢弃新项目。DropOldest
:移除最旧的项目以腾出空间给新项目。DropNewest
:丢弃最新的项目。对于大多数后台任务,使用 Wait
。当丢失少量消息是可以接受的时,DropWrite
或 DropOldest
可以是合适的选择。当最新事件比旧事件更相关时,可以使用 DropOldest
。
通道的实际应用 在真实的 ASP.NET Core 应用程序中,你可以使用通道来实现写回缓存策略 (Write Back Caching Strategy)。
这种缓存策略用于高速、写入密集的场景。
主要思想是数据首先写入缓存。缓存随后在满足特定条件或间隔后,异步地将数据写回数据库。
让我们探索一个真实示例:一个在线商店应用程序。
用户经常向他们的在线购物车添加或移除商品。最终状态仅在结账时才真正关键。写入可以非常快,因为它们首先命中缓存,系统可以定期将更新刷新到数据库。
这使得在购物高峰期能够实现高写入吞吐量,而数据库最终会获得最终的购物车详情。
以下是创建产品购物车的 WebApi 端点:
public record ProductCartRequest(string UserId, List<ProductCartItemRequest> ProductCartItems);
public record ProductCartItemRequest(Guid ProductId, int Quantity);
[HttpPost]
public async Task<ActionResult<ProductCartResponse>> CreateCart(ProductCartRequest request)
{
var response = await _service.AddAsync(request);
return CreatedAtAction(nameof(GetCart), new { id = response.Id }, response);
}
当创建一个新的 ProductCart
时,它仅被立即添加到缓存中:
public class WriteBackCacheProductCartService
{
private readonly HybridCache _cache;
private readonly IProductCartRepository _repository;
private readonly Channel<ProductCartDispatchEvent> _channel;
public WriteBackCacheProductCartService(
HybridCache cache,
IProductCartRepository repository,
Channel<ProductCartDispatchEvent> channel)
{
_cache = cache;
_repository = repository;
_channel = channel;
}
public async Task<ProductCartResponse> AddAsync(ProductCartRequest request)
{
var productCart = new ProductCart
{
Id = Guid.NewGuid(),
UserId = request.UserId,
CartItems = request.ProductCartItems.Select(x => new CartItem
{
Id = Guid.NewGuid(),
Quantity = x.Quantity,
Price = Random.Shared.Next(100, 1000) // 模拟价格
}).ToList()
};
var cacheKey = $"productCart:{productCart.Id}";
var productCartResponse = MapToProductCartResponse(productCart); // 假设的映射方法
await _cache.SetAsync(cacheKey, productCartResponse); // 写入缓存
// 发布事件到通道,触发后台异步写回数据库
await _channel.Writer.WriteAsync(new ProductCartDispatchEvent(productCart));
return productCartResponse;
}
}
这里我们使用一个有界通道来发布 ProductCartDispatchEvent
。
public record ProductCartDispatchEvent(ProductCart ProductCart);
builder.Services.AddSingleton(_ => Channel.CreateBounded<ProductCartDispatchEvent>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait // 通道满时等待
}));
一个后台服务从通道读取数据,并异步地将购物车数据写入数据库:
public class WriteBackCacheBackgroundService(IServiceScopeFactory scopeFactory,
Channel<ProductCartDispatchEvent> channel) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var command in channel.Reader.ReadAllAsync(stoppingToken))
{
// 使用作用域获取作用域内服务
using var scope = scopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IProductCartRepository>();
var existingCart = await repository.GetByIdAsync(command.ProductCart.Id);
if (existingCart is null)
{
await repository.AddAsync(command.ProductCart); // 新增
return;
}
existingCart.CartItems = command.ProductCart.CartItems; // 更新现有
await repository.UpdateAsync(existingCart);
}
}
}
注意:这种模式可以显著提高写入速度,但它需要健壮的冲突解决和故障处理机制来确保一致性。要确保在缓存故障时购物车状态永不丢失,需要健壮的复制或备份机制。
使用通道的最佳实践和技巧 通道是一个强大的工具,但要充分利用它们(并避免问题),遵循一些关键的最佳实践非常重要。以下是一些在 .NET 应用程序中使用通道的实用技巧。
优先选择有界通道以保证安全: 大多数时候,你应该从有界通道开始。有界通道设置了等待处理数据量的上限。这使你的应用程序在重负载下更加稳定。 根据你的工作负载将容量设置为合理的数值。如果你预期数据会激增,选择一个能覆盖典型突发但又不至于过大的值。 无界通道仅在你知道数据速率始终很低的情况下才是安全的。
记得标记通道完成:
当你的生产者完成写入时,在通道上调用 .Writer.Complete()
。这会让消费者知道不会有更多数据了,允许它结束。
如果你不标记通道完成,你的消费者的 ReadAllAsync
循环将永远不会结束。
在写入和读取操作中使用 await
:
通道是为异步代码设计的。在写入或读取时始终使用 await
,以避免阻塞线程并保持应用的响应性。
await Channel.Writer.WriteAsync(data, cancellationToken);
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
{
// 处理项目
}
正确处理取消:
在读取或写入通道时始终传递 CancellationToken
。这允许你在应用关闭或用户取消操作时停止处理。
await channel.Writer.WriteAsync(message, cancellationToken);
不要在过多的生产者或消费者之间共享通道: 虽然通道可以处理多个写入器和读取器,但过多会使调试变得困难。在大多数情况下,为了获得最佳性能和最容易推理,坚持使用单一生产者和单一消费者。 如果你需要更多,.NET 通道支持多个读取器和写入器,但你应该精心设计以避免意外情况。
监控你的通道使用情况: 在生产环境中监视你的通道容量。如果你经常看到通道被填满(生产者等待写入),这可能意味着你的消费者太慢了。你可能需要加快处理速度或增加通道大小。
选择合适的 FullMode(对于有界通道):
创建有界通道时,你可以设置 BoundedChannelFullMode
来控制通道满时发生的情况(Wait
, DropWrite
, DropOldest
, DropNewest
)。根据你的应用需求和对数据丢失的容忍度仔细选择。