问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

parallel.foreach 有什么 风险

发布网友 发布时间:2022-04-25 08:51

我来回答

1个回答

热心网友 时间:2023-11-10 15:51

当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理。(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算)。.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景。Parallel.ForEachParallel.ForEach 是 foreach 的多线程实现,他们都能对 IEnumerable<T> 类型对象进行遍历,Parallel.ForEach 的特殊之处在于它使用多线程来执行循环体内的代码段。
Parallel.ForEach 最常用的形式如下:public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source,Action<TSource> body)
PLINQ
PLINQ 也是一种对数据进行并行处理的编程模型,它通过 LINQ 的语法来实现类似 Parallel.ForEach 的多线程并行处理。
场景一:简单数据 之 独立操作的并行处理(使用 Parallel.ForEach)示例代码:public static void IndependentAction(IEnumerable<T> source, Action<T> action){Parallel.ForEach(source, element => action(element));}
理由:
虽然 PLINQ 也提供了一个类似的 ForAll 接口,但它对于简单的独立操作太重量化了。

2. 使用 Parallel.ForEach 你还能够设定
ParallelOptions.MaxDegreeOfParalelism 参数(指定最多需要多少个线程),这样当 ThreadPool
资源匮乏(甚至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach
依然能够顺利运行,并且当后续有更多可用线程出现时,Parallel.ForEach 也能及时地利用这些线程。PLINQ
只能通过WithDegreeOfParallelism 方法来要求固定的线程数,即:要求了几个就是几个,不会多也不会少。</ol>场景二:顺序数据 之 并行处理(使用 PLINQ 来维持数据顺序)
当输出的数据序列需要保持原始的顺序时采用 PLINQ 的 AsOrdered 方法非常简单高效。示例代码:public static void GrayscaleTransformation(IEnumerable<Frame> Movie){var ProcessedMovie =Movie.AsParallel().AsOrdered().Select(frame => ConvertToGrayscale(frame));
foreach (var grayscaleFrame in ProcessedMovie){// Movie frames will be evaluated lazily}}
理由:
Parallel.ForEach 实现起来需要绕一些弯路,首先你需要使用以下的重载在方法:
public static ParallelLoopResult ForEach<TSource >(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState, Int64> body)</ol>这个重载的 Action 多包含了 index 参数,这样你在输出的时候就能利用这个值来维持原先的序列顺序。请看下面的例子: public static double [] PairwiseMultiply(double[] v1, double[] v2)
{var length = Math.Min(v1.Length, v2.Lenth);double[] result = new double[length];Parallel.ForEach(v1, (element, loopstate, elementIndex) =>result[elementIndex] = element * v2[elementIndex]);return result;}
你可能已经意识到这里有个明显的问题:我们使用了固定长度的数组。如果传入的是 IEnumerable 那么你有4个解决方案: (1) 调用 IEnumerable.Count() 来获取数据长度,然后用这个值实例化一个固定长度的数组,然后使用上例的代码。(2) The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(没看懂贴原文)(3) 第三种方式是采用返回一个哈希集合的方式,这种方式下通常需要至少2倍于传入数据的内存,所以处理大数据时请慎用。(4) 自己实现排序算法(保证传入数据与传出数据经过排序后次序一致)2. 相比之下 PLINQ 的 AsOrdered 方法如此简单,而且该方法能处理流式的数据,从而允许传入数据是延迟实现的(lazy materialized)场景三:流数据 之 并行处理(使用 PLINQ)
PLINQ 能输出流数据,这个特性在一下场合非常有用:

结果集不需要是一个完整的处理完毕的数组,即:任何时间点下内存中仅保持数组中的部分信息

2. 你能够在一个单线程上遍历输出结果(就好像他们已经存在/处理完了)

示例:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
var StockRiskPortfolio =
Stocks
.AsParallel()
.AsOrdered()
.Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
.Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));</ol>foreach (var stockRisk in StockRiskPortfolio)
{SomeStockComputation(stockRisk.Risk);// StockRiskPortfolio will be a stream of results}}
这里使用一个单线程的 foreach 来对 PLINQ 的输出进行后续处理,通常情况下 foreach 不需要等待 PLINQ 处理完所有数据就能开始运作。PLINQ 也允许指定输出缓存的方式,具体可参照 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚举场景四:处理两个集合(使用 PLINQ)PLINQ 的 Zip 方法提供了同时遍历两个集合并进行结合元算的方法,并且它可以与其他查询处理操作结合,实现非常复杂的机能。示例:public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b){returna.AsParallel().AsOrdered().Select(element => ExpensiveComputation(element)).Zip(b.AsParallel().AsOrdered().Select(element => DifferentExpensiveComputation(element)),(a_element, b_element) => Combine(a_element,b_element));}
示例中的两个数据源能够并行处理,当双方都有一个可用元素时提供给 Zip 进行后续处理(Combine)。 Parallel.ForEach 也能实现类似的 Zip 处理:public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b){var numElements = Math.Min(a.Count(), b.Count());var result = new T[numElements];Parallel.ForEach(a,(element, loopstate, index) =>{var a_element = ExpensiveComputation(element);var b_element = DifferentExpensiveComputation(b.ElementAt(index));result[index] = Combine(a_element, b_element);});return result;}
当然使用 Parallel.ForEach 后你就得自己确认是否要维持原始序列,并且要注意数组越界访问的问题。场景五:线程局部变量
Parallel.ForEach 提供了一个线程局部变量的重载,定义如下:public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source,Func<TLocal> localInit,Func<TSource, ParallelLoopState, TLocal,TLocal> body,Action<TLocal> localFinally)
使用的示例: public static List<R> Filtering<T,R>(IEnumerable<T> source){var results = new List<R>();using (SemaphoreSlim sem = new SemaphoreSlim(1)){Parallel.ForEach(source,() => new List<R>(),(element, loopstate, localStorage) =>{bool filter = filterFunction(element);if (filter)localStorage.Add(element);return localStorage;},(finalStorage) =>{lock(myLock){results.AddRange(finalStorage)};});}return results;}
线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序): public static void UnsafeDownloadUrls (){WebClient webclient = new WebClient();Parallel.ForEach(urls,(url,loopstate,index) =>{webclient.DownloadFile(url, filenames[index] + ".dat");Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);});}
通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。这是因为多个线程无法同时访问同一个 WebClient对象。所以我们会把 WebClient 对象定义到线程中来: public static void BAD_DownloadUrls (){Parallel.ForEach(urls,(url,loopstate,index) =>{WebClient webclient = new WebClient();webclient.DownloadFile(url, filenames[index] + ".dat");Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);});}
修改之后依然有问题,因为你的机器不是服务器,大量实例化的 WebClient 迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题: public static void downloadUrlsSafe(){Parallel.ForEach(urls,() => new WebClient(),(url, loopstate, index, webclient) =>{webclient.DownloadFile(url, filenames[index]+".dat");Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);return webclient;},(webclient) => { });}
这样的写法保证了我们能获得足够的 WebClient 实例,同时这些 WebClient 实例彼此隔离仅仅属于各自关联的线程。 虽然 PLINQ 提供了 ThreadLocal<T> 对象来实现类似的功能:public static void downloadUrl(){var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());var res =urls.AsParallel().ForAll(url =>{webclient.Value.DownloadFile(url, host[url] +".dat"));Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);});}
但是请注意:ThreadLocal<T> 相对而言开销更大! 场景五:退出操作 (使用 Parallel.ForEach)
Parallel.ForEach 有个重载声明如下,其中包含一个 ParallelLoopState 对象:public static ParallelLoopResult ForEach<TSource >(IEnumerable<TSource> source,Action<TSource, ParallelLoopState> body)
ParallelLoopState.Stop() 提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。
ParallelLoopState.IsStopped 属性可用来判定其他迭代是否调用了 Stop 方法。
示例:public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>{var matchFound = false;Parallel.ForEach(TSpace,(curValue, loopstate) =>{if (curValue.Equals(match) ){matchFound = true;loopstate.Stop();}});return matchFound;}
ParallelLoopState.Break() 通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用 Break 的起作用,并被记录到 ParallelLoopState.LowestBreakIteration 属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小 index,那么可以使用以下的代码:public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>{var loopResult = Parallel.ForEach(source,(curValue, loopState, curIndex) =>{if (curValue.Equals(match)){loopState.Break();}});var matchedIndex = loopResult.LowestBreakIteration;return matchedIndex.HasValue ? matchedIndex : -1;}

热心网友 时间:2023-11-10 15:51

当需要为多核机器进行优化的时候,最好先检查下你的程序是否有处理能够分割开来进行并行处理。(例如,有一个巨大的数据集合,其中的元素需要一个一个进行彼此独立的耗时计算)。.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 来帮助我们进行并行处理,本文探讨这两者的差别及适用的场景。Parallel.ForEachParallel.ForEach 是 foreach 的多线程实现,他们都能对 IEnumerable<T> 类型对象进行遍历,Parallel.ForEach 的特殊之处在于它使用多线程来执行循环体内的代码段。
Parallel.ForEach 最常用的形式如下:public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source,Action<TSource> body)
PLINQ
PLINQ 也是一种对数据进行并行处理的编程模型,它通过 LINQ 的语法来实现类似 Parallel.ForEach 的多线程并行处理。
场景一:简单数据 之 独立操作的并行处理(使用 Parallel.ForEach)示例代码:public static void IndependentAction(IEnumerable<T> source, Action<T> action){Parallel.ForEach(source, element => action(element));}
理由:
虽然 PLINQ 也提供了一个类似的 ForAll 接口,但它对于简单的独立操作太重量化了。

2. 使用 Parallel.ForEach 你还能够设定
ParallelOptions.MaxDegreeOfParalelism 参数(指定最多需要多少个线程),这样当 ThreadPool
资源匮乏(甚至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach
依然能够顺利运行,并且当后续有更多可用线程出现时,Parallel.ForEach 也能及时地利用这些线程。PLINQ
只能通过WithDegreeOfParallelism 方法来要求固定的线程数,即:要求了几个就是几个,不会多也不会少。</ol>场景二:顺序数据 之 并行处理(使用 PLINQ 来维持数据顺序)
当输出的数据序列需要保持原始的顺序时采用 PLINQ 的 AsOrdered 方法非常简单高效。示例代码:public static void GrayscaleTransformation(IEnumerable<Frame> Movie){var ProcessedMovie =Movie.AsParallel().AsOrdered().Select(frame => ConvertToGrayscale(frame));
foreach (var grayscaleFrame in ProcessedMovie){// Movie frames will be evaluated lazily}}
理由:
Parallel.ForEach 实现起来需要绕一些弯路,首先你需要使用以下的重载在方法:
public static ParallelLoopResult ForEach<TSource >(
IEnumerable<TSource> source,
Action<TSource, ParallelLoopState, Int64> body)</ol>这个重载的 Action 多包含了 index 参数,这样你在输出的时候就能利用这个值来维持原先的序列顺序。请看下面的例子: public static double [] PairwiseMultiply(double[] v1, double[] v2)
{var length = Math.Min(v1.Length, v2.Lenth);double[] result = new double[length];Parallel.ForEach(v1, (element, loopstate, elementIndex) =>result[elementIndex] = element * v2[elementIndex]);return result;}
你可能已经意识到这里有个明显的问题:我们使用了固定长度的数组。如果传入的是 IEnumerable 那么你有4个解决方案: (1) 调用 IEnumerable.Count() 来获取数据长度,然后用这个值实例化一个固定长度的数组,然后使用上例的代码。(2) The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(没看懂贴原文)(3) 第三种方式是采用返回一个哈希集合的方式,这种方式下通常需要至少2倍于传入数据的内存,所以处理大数据时请慎用。(4) 自己实现排序算法(保证传入数据与传出数据经过排序后次序一致)2. 相比之下 PLINQ 的 AsOrdered 方法如此简单,而且该方法能处理流式的数据,从而允许传入数据是延迟实现的(lazy materialized)场景三:流数据 之 并行处理(使用 PLINQ)
PLINQ 能输出流数据,这个特性在一下场合非常有用:

结果集不需要是一个完整的处理完毕的数组,即:任何时间点下内存中仅保持数组中的部分信息

2. 你能够在一个单线程上遍历输出结果(就好像他们已经存在/处理完了)

示例:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
var StockRiskPortfolio =
Stocks
.AsParallel()
.AsOrdered()
.Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
.Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));</ol>foreach (var stockRisk in StockRiskPortfolio)
{SomeStockComputation(stockRisk.Risk);// StockRiskPortfolio will be a stream of results}}
这里使用一个单线程的 foreach 来对 PLINQ 的输出进行后续处理,通常情况下 foreach 不需要等待 PLINQ 处理完所有数据就能开始运作。PLINQ 也允许指定输出缓存的方式,具体可参照 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚举场景四:处理两个集合(使用 PLINQ)PLINQ 的 Zip 方法提供了同时遍历两个集合并进行结合元算的方法,并且它可以与其他查询处理操作结合,实现非常复杂的机能。示例:public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b){returna.AsParallel().AsOrdered().Select(element => ExpensiveComputation(element)).Zip(b.AsParallel().AsOrdered().Select(element => DifferentExpensiveComputation(element)),(a_element, b_element) => Combine(a_element,b_element));}
示例中的两个数据源能够并行处理,当双方都有一个可用元素时提供给 Zip 进行后续处理(Combine)。 Parallel.ForEach 也能实现类似的 Zip 处理:public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b){var numElements = Math.Min(a.Count(), b.Count());var result = new T[numElements];Parallel.ForEach(a,(element, loopstate, index) =>{var a_element = ExpensiveComputation(element);var b_element = DifferentExpensiveComputation(b.ElementAt(index));result[index] = Combine(a_element, b_element);});return result;}
当然使用 Parallel.ForEach 后你就得自己确认是否要维持原始序列,并且要注意数组越界访问的问题。场景五:线程局部变量
Parallel.ForEach 提供了一个线程局部变量的重载,定义如下:public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source,Func<TLocal> localInit,Func<TSource, ParallelLoopState, TLocal,TLocal> body,Action<TLocal> localFinally)
使用的示例: public static List<R> Filtering<T,R>(IEnumerable<T> source){var results = new List<R>();using (SemaphoreSlim sem = new SemaphoreSlim(1)){Parallel.ForEach(source,() => new List<R>(),(element, loopstate, localStorage) =>{bool filter = filterFunction(element);if (filter)localStorage.Add(element);return localStorage;},(finalStorage) =>{lock(myLock){results.AddRange(finalStorage)};});}return results;}
线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序): public static void UnsafeDownloadUrls (){WebClient webclient = new WebClient();Parallel.ForEach(urls,(url,loopstate,index) =>{webclient.DownloadFile(url, filenames[index] + ".dat");Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);});}
通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。这是因为多个线程无法同时访问同一个 WebClient对象。所以我们会把 WebClient 对象定义到线程中来: public static void BAD_DownloadUrls (){Parallel.ForEach(urls,(url,loopstate,index) =>{WebClient webclient = new WebClient();webclient.DownloadFile(url, filenames[index] + ".dat");Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);});}
修改之后依然有问题,因为你的机器不是服务器,大量实例化的 WebClient 迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题: public static void downloadUrlsSafe(){Parallel.ForEach(urls,() => new WebClient(),(url, loopstate, index, webclient) =>{webclient.DownloadFile(url, filenames[index]+".dat");Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);return webclient;},(webclient) => { });}
这样的写法保证了我们能获得足够的 WebClient 实例,同时这些 WebClient 实例彼此隔离仅仅属于各自关联的线程。 虽然 PLINQ 提供了 ThreadLocal<T> 对象来实现类似的功能:public static void downloadUrl(){var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());var res =urls.AsParallel().ForAll(url =>{webclient.Value.DownloadFile(url, host[url] +".dat"));Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);});}
但是请注意:ThreadLocal<T> 相对而言开销更大! 场景五:退出操作 (使用 Parallel.ForEach)
Parallel.ForEach 有个重载声明如下,其中包含一个 ParallelLoopState 对象:public static ParallelLoopResult ForEach<TSource >(IEnumerable<TSource> source,Action<TSource, ParallelLoopState> body)
ParallelLoopState.Stop() 提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。
ParallelLoopState.IsStopped 属性可用来判定其他迭代是否调用了 Stop 方法。
示例:public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>{var matchFound = false;Parallel.ForEach(TSpace,(curValue, loopstate) =>{if (curValue.Equals(match) ){matchFound = true;loopstate.Stop();}});return matchFound;}
ParallelLoopState.Break() 通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用 Break 的起作用,并被记录到 ParallelLoopState.LowestBreakIteration 属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小 index,那么可以使用以下的代码:public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>{var loopResult = Parallel.ForEach(source,(curValue, loopState, curIndex) =>{if (curValue.Equals(match)){loopState.Break();}});var matchedIndex = loopResult.LowestBreakIteration;return matchedIndex.HasValue ? matchedIndex : -1;}
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
八月中国最凉快的地方 八月份哪里最凉快,去哪旅游好?美丽的地方 乱字同韵字是什么意思 华硕笔记本电脑触摸板怎么开笔记本电脑触摸板怎么开启和关闭_百度知 ... 陕西职务侵占案立案准则 结婚后我的恋情维系了十年,怎么做到的? 玉米仁子饭产自哪里 中国期货交易所的交易品种有哪些? 历史要怎么读,有啥诀窍 高中历史诀窍 asp.net中C# &quot;不支持给定路径的格式&quot;高手指导 报错:System.NotSupportedException: SQL Server 未处理 NText、Text、Xml 或 Image 数据类型的比较。 stream.length 引发了system.notsupportedexception类型的异常 安装raidrive时候显示system.NotsupporterException:不支持请求的安全协议 GetRequestStream()总提示System.NotSupportedException 类型的异常 “System.NotSupportedException”类型的未经处理的异常出现在 System... “(WarningIcon.Source).Metadata”引发了“System.NotSupportedException”类型的异常 求戏曲视频下载网站,或者百度网盘地址。 在WORD里如何将两页显示调成一页显示 39个学生A3是第几名? 联想39寸显示器a3怎么样 牛奶是晚上喝好还是早上喝好1,什么牛奶 牛奶不能和什么一起吃? 每天一瓶纯牛奶,有什么好处 一天喝一升牛奶有什么害处么 每天喝牛奶一瓶~~对身体有什么好处 鸡蛋能和牛奶1起吃吗?为什么? 牛奶是一种寒性食物吗? 牛奶1ml等于多少克 过敏原702食物组牛奶1级是什么意思? 命令: NETLOAD 无法加载程序集。错误详细信息: System.IO.FileLoadException? C#Deflatestream不能返回可读的stream 在 System.IO.FileNotFoundException 中第一次偶然出现的“mscorlib.dll”类型的异常 高手指教ASP.NET中filename的一些问题 ASP.NET中 登录时 提示异常详细信息: System.ArgumentException: 不支持关键字: “id”。急~~求解答~~ 异常详细信息: System.ArgumentException: 不支持关键字: “erver”。怎么解决哦? Filed类报错怎么处理? C# 获取SATA信息 C# 如何接收到蓝牙发送的数据字符 请问在C#语言中,System名称空间除了包含Console类之外还包含哪些类... C#中的using.system 男孩叫辰娜好听吗? 带娜字的微信昵称有哪些? 麻烦帮忙取个男生名字带“宇”字女生名字带“娜”字得情侣网名,要好听个性的可爱得!急急急!谢了 我的名字有个娜字我男朋友用微信名称是往后娜余生是啥意思? 我男朋友名字里有个海字.我有个娜字.取个什么网名? 父母在名字中给我们取名为“娜”是什么意思? 男的名字有个扬字 女的名字有个娜字。 想要起情侣网名该怎么起呢? 帮忙!~急~!给刚出生的小男孩起名字!跪求高手出招! 一个字的男孩或女孩名字 要求后面加个字 温冯什么的