score:1

Accepted answer

you can't make your own parallelquery<t> based classes unfortunately due to the fact that while parallelquery<t> is public it does not have any public constructors.

what you can do is use the existing plinq infrastructure to do what you want. all you really are wanting to do is do a where with a contains being the predicate... so do that.

public static parallelquery<tsource> wherecontains<tsource, tkey>(
    this parallelquery<tsource> source,
    ienumerable<tkey> values,
    func<tsource, tkey> keyselector)
{
    hashset<tkey> elements = new hashset<tkey>(values);

    return source.where(item => elements.contains(keyselector(item)));
}

this performs the where clause in parallel, and (while not documented) contains is thread safe as long as you are not performing any write operations, and because you are making a local hashset to perform the lookup you don't need to worry about writes happening.


here is a example project that prints out to the console what thread and item it is processing, you can see that it is using multiple threads.

class program
{
    static void main(string[] args)
    {
        list<int> items = new list<int>(enumerable.range(0,100));

        int[] values = {5, 12, 25, 17, 0};

        console.writeline("thread: {0}", environment.currentmanagedthreadid);

        var result = items.asparallel().wherecontains(values, x=>x).tolist();

        console.writeline("done");
        console.readline();
    }
}

static class extensions
{
    public static parallelquery<tsource> wherecontains<tsource, tkey>(
        this parallelquery<tsource> source,
        ienumerable<tkey> values,
        func<tsource, tkey> keyselector)
    {
        hashset<tkey> elements = new hashset<tkey>(values);

        return source.where(item =>
        {
            console.writeline("item:{0} thread: {1}", item, environment.currentmanagedthreadid);
            return elements.contains(keyselector(item));
        });
    }
}

score:1

could you not just do this?

public static parallelquery<tsource> where<tsource>(
    this parallelquery<tsource> source, 
    func<tsource, bool> predicate)
{
    return
        source
            .selectmany(x =>
                predicate(x)
                ? new tsource[] { x } 
                : enumerable.empty<tsource>());
}

Related Query