PLINQ の Aggregate() で死亡

RGBの色データが入った画像データ配列(正確にはunsafe中のポインタ)を集計したい。



…というシーンが発生。 for でぐるぐる回して集計するのもアリといえばアリですが…当然ながらカッコ悪い。C# なら黙って LINQ だろゴルァと*1。丁度、集計する為の関数 Aggregate() も定義されているし、なにより AsParallel() で手軽に並列化も可能と聞いた! やったねタエちゃん! 暇してるCPUコアに鞭打てるよ!!



とはいえ



集計方針が RGB 個別にニャンヤンする感じなので、seed に R,G,B メンバを備えたクラスを new RGB() して渡すわけですが……参照型渡したら、それ、並列処理できんの…? どう考えてもできないよね…? それとも、PLINQが超賢くなんかやってくれるの…?

正直良くわからんので、とりあえずググったり実験的なコード書いてみた所、最終的にこうなった!

パラレったー

threadIDs の要素数がちゃんとCPUコア数とおなじになったよ! 処理時間も体感レベルで明らかに短かったよ!! すごい!!

class Result
{
    public long R = 0;
    public long G = 0;
};

static public void test()
{
    const int LENGTH = 10001;
    int[] arr = new int[LENGTH];
    for (int i = 0; i < LENGTH; ++i)
    {
        arr[i] = i;
    }

    Result r = ParallelEnumerable.Range(0, LENGTH).Aggregate(
        () => new Result(),
        (aggr, i) =>
        {
            aggr.R += arr[i];
            aggr.G += ComputeG(arr[i]);
            return aggr;
        },
        (aggr1, aggr2) => new Result { R = aggr1.R + aggr2.R, G = aggr1.G + aggr2.G },
        (aggr) => aggr
    );

    arr[0] = 0; // ブレークポイント設定用の何か
}

static public long ComputeG(int i)
{
    threadIDs.Add(System.Threading.Thread.CurrentThread.ManagedThreadId);
    System.Threading.Thread.Sleep(1);
    return i;
}

private static HashSet<int> threadIDs = new HashSet<int>();

失敗 // 最初に書いたコード

なんも考えずに「AsParallel() したらとりあえず並列処理してくれるんでしたっけー?」的に書いたコード。 threadIDs の要素が1つで死ぬ。 シリアルじゃねーじゃねーか!

class Result
{
    public long R = 0;
    public long G = 0;
};

static public void test()
{
    const int LENGTH = 10001;
    int[] arr = new int[LENGTH];
    for (int i = 0; i < LENGTH; ++i)
    {
        arr[i] = i;
    }

    Result r = Enumerable.Range(0, LENGTH).AsParallel().Aggregate(
        new Result(),
        (aggr, i) =>
        {
            aggr.R += arr[i];
            aggr.G += ComputeG(arr[i]);
            return aggr;
        }
    );

    arr[0] = 0; // ブレークポイント作る用の何か
}

static public long ComputeG(int i)
{
    threadIDs.Add(System.Threading.Thread.CurrentThread.ManagedThreadId);
    System.Threading.Thread.Sleep(1);
    return i;
}

private static HashSet<int> threadIDs = new HashSet<int>();

失敗 // ParallelEnumerable に期待する

Enumerable じゃなくて ParallelEnumerable なんてのがあるんスか!こっちじゃないと根本の Range() が並列化しないとかそんな話すかもしや!? と期待して実行。 threadIDs の要素が1つで死ぬ。 クソぁ!!

class Result
{
    public long R = 0;
    public long G = 0;
};

static public void test()
{
    const int LENGTH = 10001;
    int[] arr = new int[LENGTH];
    for (int i = 0; i < LENGTH; ++i)
    {
        arr[i] = i;
    }

    Result r = ParallelEnumerable.Range(0, LENGTH).Aggregate(
        new Result(),
        (aggr, i) =>
        {
            aggr.R += arr[i];
            aggr.S += ComputeS(arr[i]);
            return aggr;
        },
    );

    arr[0] = 0; // ブレークポイント作る用の何か
}

static public long ComputeS(int i)
{
    threadIDs.Add(System.Threading.Thread.CurrentThread.ManagedThreadId);
    System.Threading.Thread.Sleep(1);
    return i;
}

private static HashSet<int> threadIDs = new HashSet<int>();

ってか

やっぱり seed に与えてる new Result() じゃ無理ですよね…とぐぐり始めた所 Parallel Aggregation | Microsoft Docs の "Using PLINQ Aggregation with Range Selection" を発見。 サンプルソースを見れば一発でした。 なるほど。

ちなみに ParallelEnumerable.Aggregate Method (System.Linq) | Microsoft Docs を使ってるわけですが、なんかもうまず見た目でマジうっへりなんですけどこのリファレンス






いやぁ、PLINQ って本当にいいものですね(※流れてくる哀愁ただようトランペットBGM

*1:※偏見です