引言
在 C# 開發(fā)中,合理控制并發(fā)任務(wù)數(shù)量是確保應(yīng)用程序高效、穩(wěn)定運(yùn)行的關(guān)鍵。過多的并發(fā)任務(wù)可能會(huì)耗盡系統(tǒng)資源,導(dǎo)致性能下降和不穩(wěn)定。本文將深入探討幾種有效的方法來限制 C# 中的并發(fā)任務(wù)數(shù)量,并通過具體的應(yīng)用場景和示例代碼展示如何實(shí)現(xiàn)這些方法。
使用 SemaphoreSlim
SemaphoreSlim
是一個(gè)輕量級的同步原語,用于控制訪問某一資源或資源池的線程數(shù)。通過它,我們可以很容易地限制并發(fā)任務(wù)的數(shù)量。當(dāng)你有一個(gè)需要訪問共享資源(如數(shù)據(jù)庫連接池)的任務(wù)列表,但希望同時(shí)執(zhí)行的任務(wù)數(shù)量不超過某個(gè)特定值時(shí),可以使用SemaphoreSlim
。
示例
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
int maxConcurrentTasks = 3;
SemaphoreSlim semaphore = new SemaphoreSlim(maxConcurrentTasks);
List<Task> tasks = new List<Task>();
for (int i = 0; i < 100; i++)
{
await semaphore.WaitAsync();
var task = Task.Run(async () =>
{
try
{
// 模擬長時(shí)間運(yùn)行的任務(wù)
Console.WriteLine($"Task {Task.CurrentId} started.");
await Task.Delay(TimeSpan.FromSeconds(2));
Console.WriteLine($"Task {Task.CurrentId} completed.");
}
finally
{
semaphore.Release();
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);
Console.WriteLine("All tasks completed.");
}
}
在這個(gè)示例中,我們限制了最多只有 3 個(gè)任務(wù)可以同時(shí)運(yùn)行。通過對SemaphoreSlim
的調(diào)用,我們確保了當(dāng)達(dá)到最大并發(fā)任務(wù)數(shù)量時(shí),其他任務(wù)將會(huì)等待直到某個(gè)任務(wù)完成并釋放信號量。
使用 TPL Dataflow
TPL (Task Parallel Library) Dataflow 提供了一個(gè)更高級的方式來處理數(shù)據(jù)流和并發(fā)任務(wù),通過它可以很容易地限制并發(fā)任務(wù)的數(shù)量。當(dāng)你需要處理一系列的數(shù)據(jù)或任務(wù),并且每個(gè)任務(wù)都可能需要一些時(shí)間來完成,同時(shí)你想要限制同時(shí)處理這些任務(wù)的數(shù)量時(shí),可以使用 TPL Dataflow。
示例
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
static async Task Main(string[] args)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 3 // 最大并發(fā)任務(wù)數(shù)量
};
var block = new ActionBlock<int>(async n =>
{
Console.WriteLine($"Processing {n}...");
await Task.Delay(TimeSpan.FromSeconds(1)); // 模擬異步操作
Console.WriteLine($"Processed {n}.");
}, options);
for (int i = 0; i < 100; i++)
{
block.Post(i);
}
block.Complete();
await block.Completion;
Console.WriteLine("All tasks completed.");
}
}
在這個(gè)示例中,ActionBlock
被用來處理一系列的任務(wù),通過設(shè)置ExecutionDataflowBlockOptions
中的MaxDegreeOfParallelism
屬性,我們限制了最大的并發(fā)任務(wù)數(shù)量。
使用 Parallel.ForEach
Parallel.ForEach
是 .NET 中用于并行處理集合元素的方法,它可以指定MaxDegreeOfParallelism
參數(shù)來限制并發(fā)任務(wù)的數(shù)量。
示例
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var urls = new List<string> { /* 一系列 URL */ };
var options = new ParallelOptions { MaxDegreeOfParallelism = 3 };
await Parallel.ForEachAsync(urls, options, async (url, _) =>
{
var html = await new HttpClient().GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
});
}
}
在這個(gè)示例中,我們使用Parallel.ForEachAsync
方法來并行下載多個(gè)網(wǎng)頁內(nèi)容,并通過設(shè)置MaxDegreeOfParallelism
為 3 來限制同時(shí)進(jìn)行的下載任務(wù)數(shù)量。
使用 Polly Bulkhead
Polly 是一個(gè)強(qiáng)大的 .NET 錯(cuò)誤處理和彈性庫,它的 Bulkhead 隔板策略可以限制并發(fā)任務(wù)的數(shù)量,并可以選擇將超過該數(shù)量的任務(wù)排隊(duì)。
示例
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Polly;
class Program
{
static async Task Main(string[] args)
{
var bulkhead = Policy.BulkheadAsync(3, Int32.MaxValue); // 最大并發(fā)任務(wù)數(shù)量為 3
var urls = new List<string> { /* 一系列 URL */ };
var tasks = new List<Task>();
foreach (var url in urls)
{
var t = bulkhead.ExecuteAsync(async () =>
{
var html = await new HttpClient().GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
});
tasks.Add(t);
}
await Task.WhenAll(tasks);
}
}
在這個(gè)示例中,我們使用 Polly 的 Bulkhead 隔板策略來限制并發(fā)下載任務(wù)的數(shù)量,并將超過最大并發(fā)數(shù)量的任務(wù)自動(dòng)排隊(duì)。
使用 Task.WhenAny
Task.WhenAny
方法可以用于限制并發(fā)任務(wù)的數(shù)量,其基本思路是維護(hù)一個(gè)任務(wù)列表,當(dāng)任務(wù)數(shù)量達(dá)到閾值時(shí),等待其中一個(gè)任務(wù)完成,然后繼續(xù)添加新任務(wù)。
示例
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
var urls = new List<string> { /* 一系列 URL */ };
var maxConcurrentTasks = 3;
var tasks = new List<Task>();
foreach (var url in urls)
{
tasks.Add(Task.Run(async () =>
{
var html = await new HttpClient().GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
}));
if (tasks.Count >= maxConcurrentTasks)
{
await Task.WhenAny(tasks);
tasks = tasks.Where(t => t.Status == TaskStatus.Running).ToList();
}
}
await Task.WhenAll(tasks);
}
}
在這個(gè)示例中,我們通過Task.WhenAny
方法來等待任務(wù)列表中的任意一個(gè)任務(wù)完成,然后移除已完成的任務(wù),從而保持并發(fā)任務(wù)的數(shù)量不超過閾值。
總結(jié)
限制并發(fā)任務(wù)的數(shù)量是確保應(yīng)用程序穩(wěn)定和高效運(yùn)行的關(guān)鍵。在 C# 中,我們可以使用SemaphoreSlim
、TPL Dataflow、Parallel.ForEach
、Polly Bulkhead 和Task.WhenAny
等多種方法來輕松實(shí)現(xiàn)這一目標(biāo)。根據(jù)具體的業(yè)務(wù)需求和應(yīng)用場景,選擇最合適的方法來控制并發(fā)任務(wù)的數(shù)量,可以有效避免資源過度消耗,提高系統(tǒng)的響應(yīng)速度和可靠性。
該文章在 2024/12/24 11:48:49 編輯過