c# 实现生产者-消费者模式,开箱即用。

由于最近经常使用到生产者消费者模式,又在不同的项目中,所以将它封装成一个类,用来快速实现。

直接上代码。

/// <summary>
/// 创建一个生产者-消费者模型
/// </summary>
/// <typeparam name="T">生产的产品类型</typeparam>
/// <param name="producer">生产者,若返回 null 则结束该生产者线程。</param>
/// <param name="customer">消费者</param>
/// <param name="maxQueueCount">最大队列长度</param>
public class TaskPipe<T>(Func<T?> producer, Func<T, int> customer, int maxQueueCount = 1024)
{
    public delegate void FinishedDelegate(int totalCount, int successCount, int skippedCount, int failedCount);
    public delegate void ReportProgressDelegate(int totalCount, int successCount, int skippedCount, int failedCount, double speed);

    private readonly Func<T?> _producer = producer;
    private readonly Func<T, int> _customer = customer;
    private ReportProgressDelegate? _reportProgress = null;
    private FinishedDelegate? _onFinished = null;
    private int _producerInterval = 3000;
    private int _progressInterval = 1000;
    private int _producerCount = 1;
    private int _customerCount = 12;
    private readonly BlockingCollection<T> _queue = new BlockingCollection<T>(maxQueueCount);

    private int _totalCount = 0;
    private int _successedCount = 0;
    private int _skipedCount = 0;
    private int _failedCount = 0;
    private bool _finished = false;

    public TaskPipe<T> WithProducerInterval(int producerInterval)
    {
        _producerInterval = producerInterval;
        return this;
    }

    public TaskPipe<T> WithProgressInterval(int progressInterval)
    {
        _progressInterval = progressInterval;
        return this;
    }

    public TaskPipe<T> WithRreportProgress(ReportProgressDelegate reportProgress)
    {
        _reportProgress = reportProgress;
        return this;
    }

    public TaskPipe<T> WithProducerCount(int producerCount)
    {
        _producerCount = producerCount;
        return this;
    }

    public TaskPipe<T> WithCustomerCount(int customerCount)
    {
        _customerCount = customerCount;
        return this;
    }

    public TaskPipe<T> WithFinished(FinishedDelegate onFinished)
    {
        _onFinished = onFinished;
        return this;
    }

    public void Run()
    {
        try
        {
            Task[] producerTasks = new Task[_producerCount];
            for (int i = 0; i < _producerCount; i++)
            {
                producerTasks[i] = Task.Factory.StartNew(Producter);
            }

            Task[] consumerTasks = new Task[_customerCount];
            for (int i = 0; i < _customerCount; i++)
            {
                consumerTasks[i] = Task.Factory.StartNew(Customer);
            }

            Task.Factory.StartNew(PrintProgress, TaskCreationOptions.LongRunning);
            Task.WaitAll(producerTasks);
            _queue.CompleteAdding();
            Task.WaitAll(consumerTasks);
            _finished = true;

            _onFinished?.Invoke(_totalCount, _successedCount, _skipedCount, _failedCount);
        }
        catch
        {
            _finished = true;
            throw;
        }
    }

    private void PrintProgress()
    {
        int lastCount = 0;

        while (true)
        {
            if (_reportProgress == null)
            {
                return;
            }

            if (!_finished)
            {
                int downloadCountTemp = _successedCount;
                double speed = (downloadCountTemp - lastCount) * 1000.0 / _progressInterval;
                lastCount = downloadCountTemp;
                _reportProgress?.Invoke(_totalCount, downloadCountTemp, _skipedCount, _failedCount, Math.Round(speed, 2));
            }
            else
            {
                return;
            }

            Thread.Sleep(_progressInterval);
        }
    }

    private void Producter()
    {
        TimeSpan timeout = TimeSpan.FromMilliseconds(_producerInterval);
        while (true)
        {
            T? task = _producer.Invoke();
            if (task == null)
            {
                return;
            }

            while (true)
            {
                if (_queue.TryAdd(task, timeout))
                {
                    Interlocked.Increment(ref _totalCount);
                    break;
                }
            }
        }
    }

    private void Customer()
    {
        foreach (T task in _queue.GetConsumingEnumerable())
        {
            int result = _customer.Invoke(task);
            switch (result)
            {
                case 0:
                    Interlocked.Increment(ref _successedCount);
                    break;
                case 1:
                    Interlocked.Increment(ref _skipedCount);
                    break;
                case -1:
                    Interlocked.Increment(ref _failedCount);
                    break;
            }
        }
    }
}

下面是使用示例:

internal class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine("Hello World!");

        new TaskPipe<int?>(Prod, Cust, 200)
            .WithFinished(Finished)
            .WithProducerCount(5) // 定义5个生产者
            .WithRreportProgress(Report)
            .Run();
    }

    private static void Report(int totalCount, int successCount, int skippedCount, int failedCount, double speed)
    {
        // 这里接收到进度汇报事件。
    }

    private static void Finished(int totalCount, int successCount, int skippedCount, int failedCount)
    {
        // 这里接收到生产消费全部结束事件。
    }

    private static int Cust(int? arg)
    {
        // 模拟消费者耗时
        Thread.Sleep(1);
        Console.WriteLine($"已处理{arg}");
        return 0;
    }

    private static int? Prod()
    {
        // 模拟生产者耗时
        Thread.Sleep(5);
        var value = new Random().Next(0, 10000);
        
        if (value == 5000)
        {
            // 模拟生产结束。生产者返回null,将停止任务。
            return null;
        }

        return value;
    }
}

实现效果:

image

 

© 版权声明
THE END
喜欢就支持一下吧
点赞19 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情

    暂无评论内容