资源描述
.Net 4.0并行库实用性演练(前言)
自VS2010发布近半年了,虽然整天想学习新东西,要更新到自己时,发现原来自己基本也很懒,2008还没用上多久呢,无奈被2010 了。用了几天,IDE模样还是和05、08差不多,加了些小特性,以后慢慢体验吧,第一感觉启动速度慢多了。主要还是.Net 4.0的变化,其实也就是修修补补,语言特性几乎没什么新特性,C#多了个Dynamic,十年前VB就支持的晚绑定。只好把注意力放在了 Framework上,新加的并行支持应该是最大的变化吧。
VS2010发布会我也去过的,并行支持是一大卖点。当时记得台上一个MM对一个Linq查询语句加了个AsParallel(),性能就神奇 地提高了一倍,台下掌声雷动。确实不费吹灰之力提高程序性能,是最能引起大家兴趣的。在看电子期刊时,看到冷冷同学,还有吴秦的文章,给偶这些菜鸟以震撼 的启发,原来偶已经远远落在了在读大学生的后面。
那就开始学吧,就拿Parallel开刀。先抓个垫背的:
static void Set(int length)
{
var array = new int[length, length, length];
for (int i = 0; i < length; i++)
for (int j = 0; j < length; j++)
for (int k = 0; k < length; k++)
array[i, j, k] = System.Threading.Thread.CurrentThread.ManagedThreadId;
}
再请出真神:
static void ParallelSet(int length)
{
var array = new int[length, length, length];
Parallel.For(0, length, i =>
{
for (int j = 0; j < length; j++)
for (int k = 0; k < length; k++)
array[i, j, k] = System.Threading.Thread.CurrentThread.ManagedThreadId;
});
}
PK:
CodeTimer.Time("Single thread", 100, () => Set(100));
CodeTimer.Time("Multiple thread", 100, () => ParallelSet(100));
结果,1136ms:729ms,果然不错。不过MSDN的例子说不定是被和谐过的,所以偶总会变变试验过程。果然发现另有乾坤。
.Net 4.0并行库实用性演练
前面说在练习Parallel时,发现另有乾坤,是这样的代码:
代码
static IEnumerable<Person> testFill()
{
var list = new List<Person>(9);
Enumerable.Range(1, 99999).ToList().ForEach(n =>
{
var name = "Person" + n % 9;
list.Add(new Person { Id = n, Name = name });
});
Console.WriteLine("Person's count is {0}", list.Count);
return list;
}
static IEnumerable<Person> testFillParallel()
{
var list = new List<Person>(9);
Enumerable.Range(1, 99999).AsParallel().ForAll(n =>
{
var name = "Person" + n % 9;
list.Add(new Person { Id = n, Name = name });
});
Console.WriteLine("Person's count is {0}", list.Count);
return list;
}
class Person
{
internal int Id { get; set; }
internal string Name { get; set; }
}
试验结果如下(单位ms):
次数
1
2
3
4
Fill 方法
37
27
26
26
FillParallel 方法
43
20
19
20
这个结果有点奇妙的。第一次多线程居然还不如单线程快,和上文例子比较一下,有点明白了。稍微改了下代码,在Add语句前加了个Thread.Sleep(1),并把 List<Person>集合元素减为999,试了一次,结果如下(单位ms):
次数
1
2
3
4
Fill 方法
1012
998
998
999
FillParallel 方法
547
504
504
504
多个线程协同工作时,分配任务本身有开销,要是分配的开销比任务本身还大,多线程就没有意义了。就比如你交待别人做某件事,要是交待的功夫比自 己做还长,还不如自己做。不过从结果也可以看出一个辩证关系,从长远打算,第一次让别人熟悉业务,付出点培训成本,执行完一次后,以后就轻松多了,速度提 高了一倍。如果这里Sleep一下,模拟长一点的单次处理过程,一开始多线程的优势就会非常明显。
FillParallel方法,大家 觉得有没有其它问题呢?想必一般人都能看出,这里有最初级的线程安全问题。没看出的应该是刚学.Net各种集合的初学者,线程安全对他们还只是个太虚幻 境。不过借助这个Parallel,就可以轻松神游幻境。把FillParallel方法循环一百次执行,会发现返回结果本来应该有999个元素,输出的 却显示却结果经常少十几二十个。如果创建List时赋的容量不够,在List扩容时,还可能引发异常。一般是像下图这样(不过一百次都是999也不是不可 能,要看你的RP了):
应提醒一点的是,试验要在Release编译模式下运行,不然看不到线程安全问题,并行执行的效率提升得也很有限。我用的电脑都是双核,不知道在单核电脑的运行情况如何,可能有一定区别。
接着我改下逻辑,增加了一个是否Person存在重名的判断,变成:
代码
static IEnumerable<Person> testFillParallel()
{
var list = new List<Person>(9);
Enumerable.Range(1, 999).AsParallel().ForAll(n =>
{
var name = "Person" + n % 9;
if (list.Count(p => p.Name == name) < 1) list.Add(new Person { Id = n, Name = name });
});
Console.WriteLine("Person's count is {0}", list.Count);
return list;
}
RP不管用了,执行几次,必抛异常:System.InvalidOperationException: Collection was modified; enumeration operation may no execute.
一个线程在枚举集合元素,这时必须保证集合不被其它线程修改,怎么办呢?以前,就知道用锁,现在据说有了线程安全的集合类,在 System.Collections.Concurrent命名空间下,有ConcurrentDictionary, ConcurrentQueue, ConcurrentStack,就是没有ConcurrentList。费了半天,才发现与List对应的应该是 BlockingCollection。
把集合定义换成: var list = new BlockingCollection<Person>(9); 只见刷刷刷,哪怕执行几万次都可以一路跑完了。
不过这样做,还是会发现问题,不知大家看出了吗?
.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(一)
2010-05-11 13:11 by 吴秦, 1410 visits, 网摘, 收藏, 编辑
引言
随着CPU多核的普及,编程时充分利用这个特性越显重要。本文首先用传统的嵌套循环进行数组填充,然后用.NET 4.0中的System.Threading.Tasks提供的Parallel Class来并行地进行填充(当然这里也用到嵌套循环),通过对比发现其中差异。主要内容如下:
· 通常的数组填充
· 并行的组数填充
· 性能比较
· System.Threading.Tasks分析,这个将在续篇.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(二)中介绍
1、通常的数组填充
首先看如下代码:
通常的数组填充
using System;
namespace ParallelForSample
{
public class SingleCore
{
public static void Calculate(int calcVal)
{
Utility util = new Utility();
util.Start();
int[,] G = new int[calcVal, calcVal];
for (int k = 0; k < calcVal; k++)
for (int i = 0; i < calcVal; i++)
for (int j = 0; j < calcVal; j++)
G[i, j] = Math.Min(G[i, j], G[i, k] + G[k, j]);
util.Stop();
}
}
}
上面的粗体红色显示的几行代码就是实现数组填充,这个很好理解不用多费口舌。补充说明的是:上面的Utility是为了统计性能而编写的一个类,它主要就是用到了Stopwatch对象——它提供一组方法和属性,可用于准确地测量运行时间。Utility的代码如下:
Utility类
public class Utility
{
private Stopwatch _stopwatch;
public void Start()
{
_stopwatch = new Stopwatch();
_stopwatch.Start();
}
public void Stop()
{
_stopwatch.Stop();
TimeSpan ts = _stopwatch.Elapsed;
string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
ts.Hours, ts.Minutes, ts.Seconds,
ts.Milliseconds / 10);
Console.WriteLine("Time taken : {0}", elapsedTime);
}
}
利用它我们就可以对数组填充所耗费的时间进行计算了。
2、并行的组数填充
为了充分利用CPU的多核,我们编写如下代码:
并行的数组填充
using System;
using System.Threading.Tasks;
namespace ParallelForSample
{
public class MultiCore
{
public static void Calculate(int calcVal)
{
Utility util = new Utility();
util.Start();
int[,] G = new int[calcVal, calcVal];
Parallel.For(0, calcVal,
delegate(int k)
{
Parallel.For(0, calcVal, delegate(int i)
{
for (int j = 0; j < calcVal; j++)
G[i, j] = Math.Min(G[i, j], G[i, k] + G[k, j]);
});
}
);
util.Stop();
}
}
}
留意上面的红色粗体显示的几行代码,它利用了Parallel.For Method (Int32, Int32, Action<Int32>)方法,Parallel类位于命名空间System.Threading.Tasks中,它支持并行循环。此Parallel.For方法使得它里面的迭代可能并行地运行,注意到上述代码中它的第三个参数是一个委托。在(0,calcVal)之间,这个委托将被调用。
3、性能比较
现在我们来测试一下,上面两种方法的执行性能差异如何,下载源码。其实,核心代码已经在上面贴出来了,现在注意是编写实例来测试,代码主要如下:
性能比较测试
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ParallelForSample
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Single core");
SingleCore.Calculate(1000);
Console.WriteLine("Multi core");
MultiCore.Calculate(1000);
Console.WriteLine("Finished");
Console.ReadKey();
}
}
}
运行之后得到如下结果:(不同电脑配置不同,得出结果不同)
图1、性能比较
从结果可以看出,并行的数组填充比通常的数组填充性能更高。
· System.Threading.Tasks分析,这个将在续篇.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(二)中介绍……
作者:吴秦
出处:
本文基于署名 2.5 中国大陆许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名吴秦(包含链接).
.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(二)
2010-05-12 21:54 by 吴秦, 1190 visits, 网摘, 收藏, 编辑
引言
随着CPU多核的普及,编程时充分利用这个特性越显重要。上篇首先用传统的嵌套循环进行数组填充,然后用.NET 4.0中的System.Threading.Tasks提供的Parallel Class来并行地进行填充,最后对比他们的性能。本文将深入分析Parallel Class并借机回答上篇9楼提出的问题,而System.Threading.Tasks分析,这个将推迟到.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(三)中介绍。内容如下:
· 1、Parallel Class
o 1.1、For方法
o 1.2、ForEach方法
o 1.3、Invoke方法
· 2、并发控制疑问?
o 2.1、使用Lock锁
o 2.2、使用PLINQ——用AsParallel
o 2.3、使用PLINQ——用ParallelEnumerable
o 2.4、使用Interlocked操作
o 2.5、使用Parallel.For的有Thread-Local变量重载函数
· 性能比较
1、Parallel Class
Parallel——这个类提供对通常操作(诸如for、foreach、执行语句块)基于库的数据并行替换。它只是System.Threading.Tasks命名空间的一个类,该命名空间中还包括很多其他的类。下面举个例子来说明如何使用Parallel.For(来自MSDN):
view source
print?
01
using System.Threading.Tasks;
02
class Test
03
{
04
static int N = 1000;
05
06
static void TestMethod()
07
{
08
// Using a named method.
09
Parallel.For(0, N, Method2);
10
11
// Using an anonymous method.
12
Parallel.For(0, N, delegate(int i)
13
{
14
// Do Work.
15
});
16
17
// Using a lambda expression.
18
Parallel.For(0, N, i =>
19
{
20
// Do Work.
21
});
22
}
23
24
static void Method2(int i)
25
{
26
// Do work.
27
}
28
}
上面这个例子简单易懂,上篇我们就是用的Parallel.For,这里就不解释了。其实Parallel类的方法主要分为下面三类:
· For方法
· ForEach方法
· Invoke方法
1.1、For方法
在里面执行的for循环可能并行地运行,它有12个重载。这12个重载中Int32参数和Int64参数的方法各为6个,下面以Int32为例列出:
· For(Int32 fromInclusive, Int32 toExclusive, Action<Int32> body),该方法对区间(fromInclusive,toExclusive)之间的迭代调用body表示的委托。body委托有一个迭代数次的int32参数,如果fromInclusive>=toExclusive,则不会执行任何迭代。
· For(Int32 fromInclusive, Int32 toExclusive, Action<Int32, ParallelLoopState>),该方法对区间(fromInclusive, toExclusive)之间的迭代调用body表示的委托。body委托有两个参数——表示迭代数次的int32参数、一个可用于过早地跳出循环的ParallelLoopState实例。如果fromInclusive>=toExclusive,则不会执行任何迭代。
调用Break通知For操作当前迭代之后的迭代不需要执行。然而,在此之前的迭代如果没有完成仍然需要执行。因此,调用Break类似于调用break跳出传统的for循环,不是break的原因是它不保证当前迭代之后的迭代绝对不会执行。
如果在当前迭代之前的迭代不必要执行,应该调用Stop而不是Break。调用Stop通知For循环放弃剩下的迭代,不管它是否在当前迭代之前或之后,因为所以要求的工作已经完成。然而,Break并不能保证这个。
· For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32> body),跟第一个方法类似,但它的区间是[fromInclusive, toExclusive)。
· For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32, ParallelLoopState> body),跟第二个方法类似,单的区间是[fromInclusive, toExclusive)。
· For<TLocal>(Int32 fromInclusive, Int32 toExclusive, Func<TLocal> localInit, Func<Int32, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally),它的迭代区间是[fromInclusive, toExclusive)。另外body有两个local状态变量用于同一线程的迭代之间共享。localInit委托将在每个线程参与循环执行时调用, 并返回这些线程初始的local状态。这些初始状态被传递给body,当它在每个线程上第一次调用时。然后,接下来body调用返回一个可能的修改状态值 且传递给下一次body调用。最终,最后一次在每个线程上的body调用返回的一个状态值传递给localFinally委托。每个线程执行在自己的 loacl 状态上执行最后一个动作时,localFinally委托将被调用。这个委托可能在多个线程上并发执行,因此,你必须同步访问任何共享变量。
· For<TLocal>(Int32, Int32, ParallelOptions, Func<TLocal>, Func<Int32, ParallelLoopState, TLocal, TLocal>, Action<TLocal>),跟上面的方法类似。
下面代码演示了For(Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action<Int32> body)方法(来自MSDN):
show sourceview source
print?
01
using System;
02
using System.Collections.Generic;
03
using System.Linq;
04
using System.Text;
05
using System.Threading;
06
using System.Threading.Tasks;
07
08
namespace ConsoleApplication2
09
{
10
class Program
11
{
12
// Demonstrated features:
13
// CancellationTokenSource
14
// Parallel.For()
15
// ParallelOptions
16
// ParallelLoopResult
17
// Expected results:
18
// An iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed.
19
// The order of execution of the iterations is undefined.
20
// The iteration when i=2 cancels the loop.
21
// Some iterations may bail out or not start at all; because they are temporally executed in unpredictable order,
22
// it is impossible to say which will start/complete and which won't.
23
// At the end, an OperationCancelledException is surfaced.
24
// Documentation:
25
//
26
27
static void Main(string[] args)
28
{
29
CancellationTokenSource cancellationSource = new CancellationTokenSource();
30
ParallelOptions options = new ParallelOptions();
31
options.CancellationToken = cancellationSource.Token;
32
try
33
{
34
ParallelLoopResult loopResult = Parallel.For(
35
0,
36
10,
37
options,
38
(i, loopState) =>
39
{
40
Console.WriteLine("Start Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);
41
42
// Simulate a cancellation of the loop when i=2
43
if (i == 2)
44
{
45
cancellationSource.Cancel();
46
}
47
48
// Simulates a long execution
49
for (int j = 0; j < 10; j++)
50
{
51
Thread.Sleep(1 * 200);
52
53
// check to see whether or not to continue
54
if (loopState.ShouldExitCurrentIteration) return;
55
}
56
57
Console.WriteLine("Finish Thread={0}, i={1}", Thread.CurrentThread.ManagedThreadId, i);
58
}
59
);
60
if (loopResult.IsCompleted)
61
{
62
Console.WriteLine("All iterations completed successfully. THIS WAS NOT EXPECTED.");
63
}
64
}
65
// No exception is expected in this example, but if one is still thrown from a task,
66
// it will be wrapped in AggregateException and propagated to the main thread.
67
catch (AggregateException e)
68
{
69
Console.WriteLine("Parallel.For has thrown an AggregateException. THIS WAS NOT EXPECTED.\n{0}", e);
70
}
71
// Catching the cancellation exception
72
catch (OperationCanceledException e)
73
{
74
Console.WriteLine("An iteration has triggered a cancellation. THIS WAS EXPECTED.\n{0}", e.ToString());
75
}
76
}
77
}
78
}
1.2、ForEach方法
在迭代中执行的foreach操作可能并行地执行,它有20个重载。这个方法太多,但用法大概跟For方法差不多,请自行参考MSDN。
1.3、Invoke方法
提供的每个动作可能并行地执行,它有2个重载。
· Invoke(params Action[] actions):actions是一个要执行的动作数组,这些动作可能并行地执行,但并不保证执行的顺序及一定并行执行。这个方法直到提供的所有操作完成时才返回,不管是否正常地完成或异常终止。
· Invoke(ParallelOptions parallelOptions, params Action[] actions):跟上面的方法类似,只是增加了一个parallelOptions参数,可以用户调用者取消整个操作。
例如下面代码执行了三个操作(来自MSDN):
view source
print?
01
using System;
02
using System.Collections.Generic;
03
using System.Linq;
04
using System.Text;
05
using System.Threading;
06
using System.Threading.Tasks;
07
08
namespace ConsoleApplication2
09
{
10
class Program
11
{
12
static void Main()
13
{
14
展开阅读全文