由于最近经常使用到生产者消费者模式,又在不同的项目中,所以将它封装成一个类,用来快速实现。
直接上代码。
/// <summary>
/// 创建一个生产者-消费者模型
/// </summary>
/// <typeparam name="T">生产的产品类型</typeparam>
/// <param name="producer">生产者,若返回 null 则结束该生产者线程。</param>
/// <param name="customer">消费者</param>
/// <param name="maxQueueCount">最大队列长度</param>
public class TaskPipe<T>(Func<T?> producer, Func<T, int> customer, int maxQueueCount = 1024)
{
public delegate void FinishedDelegate(int totalCount, int successCount, int skippedCount, int failedCount);
public delegate void ReportProgressDelegate(int totalCount, int successCount, int skippedCount, int failedCount, double speed);
private readonly Func<T?> _producer = producer;
private readonly Func<T, int> _customer = customer;
private ReportProgressDelegate? _reportProgress = null;
private FinishedDelegate? _onFinished = null;
private int _producerInterval = 3000;
private int _progressInterval = 1000;
private int _producerCount = 1;
private int _customerCount = 12;
private readonly BlockingCollection<T> _queue = new BlockingCollection<T>(maxQueueCount);
private int _totalCount = 0;
private int _successedCount = 0;
private int _skipedCount = 0;
private int _failedCount = 0;
private bool _finished = false;
public TaskPipe<T> WithProducerInterval(int producerInterval)
{
_producerInterval = producerInterval;
return this;
}
public TaskPipe<T> WithProgressInterval(int progressInterval)
{
_progressInterval = progressInterval;
return this;
}
public TaskPipe<T> WithRreportProgress(ReportProgressDelegate reportProgress)
{
_reportProgress = reportProgress;
return this;
}
public TaskPipe<T> WithProducerCount(int producerCount)
{
_producerCount = producerCount;
return this;
}
public TaskPipe<T> WithCustomerCount(int customerCount)
{
_customerCount = customerCount;
return this;
}
public TaskPipe<T> WithFinished(FinishedDelegate onFinished)
{
_onFinished = onFinished;
return this;
}
public void Run()
{
try
{
Task[] producerTasks = new Task[_producerCount];
for (int i = 0; i < _producerCount; i++)
{
producerTasks[i] = Task.Factory.StartNew(Producter);
}
Task[] consumerTasks = new Task[_customerCount];
for (int i = 0; i < _customerCount; i++)
{
consumerTasks[i] = Task.Factory.StartNew(Customer);
}
Task.Factory.StartNew(PrintProgress, TaskCreationOptions.LongRunning);
Task.WaitAll(producerTasks);
_queue.CompleteAdding();
Task.WaitAll(consumerTasks);
_finished = true;
_onFinished?.Invoke(_totalCount, _successedCount, _skipedCount, _failedCount);
}
catch
{
_finished = true;
throw;
}
}
private void PrintProgress()
{
int lastCount = 0;
while (true)
{
if (_reportProgress == null)
{
return;
}
if (!_finished)
{
int downloadCountTemp = _successedCount;
double speed = (downloadCountTemp - lastCount) * 1000.0 / _progressInterval;
lastCount = downloadCountTemp;
_reportProgress?.Invoke(_totalCount, downloadCountTemp, _skipedCount, _failedCount, Math.Round(speed, 2));
}
else
{
return;
}
Thread.Sleep(_progressInterval);
}
}
private void Producter()
{
TimeSpan timeout = TimeSpan.FromMilliseconds(_producerInterval);
while (true)
{
T? task = _producer.Invoke();
if (task == null)
{
return;
}
while (true)
{
if (_queue.TryAdd(task, timeout))
{
Interlocked.Increment(ref _totalCount);
break;
}
}
}
}
private void Customer()
{
foreach (T task in _queue.GetConsumingEnumerable())
{
int result = _customer.Invoke(task);
switch (result)
{
case 0:
Interlocked.Increment(ref _successedCount);
break;
case 1:
Interlocked.Increment(ref _skipedCount);
break;
case -1:
Interlocked.Increment(ref _failedCount);
break;
}
}
}
}
下面是使用示例:
internal class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
new TaskPipe<int?>(Prod, Cust, 200)
.WithFinished(Finished)
.WithProducerCount(5) // 定义5个生产者
.WithRreportProgress(Report)
.Run();
}
private static void Report(int totalCount, int successCount, int skippedCount, int failedCount, double speed)
{
// 这里接收到进度汇报事件。
}
private static void Finished(int totalCount, int successCount, int skippedCount, int failedCount)
{
// 这里接收到生产消费全部结束事件。
}
private static int Cust(int? arg)
{
// 模拟消费者耗时
Thread.Sleep(1);
Console.WriteLine($"已处理{arg}");
return 0;
}
private static int? Prod()
{
// 模拟生产者耗时
Thread.Sleep(5);
var value = new Random().Next(0, 10000);
if (value == 5000)
{
// 模拟生产结束。生产者返回null,将停止任务。
return null;
}
return value;
}
}
实现效果:
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
暂无评论内容