by
10
6
2,994
1
Top 1% !
Popular
Famous
Nice
Easy-to-find
Specified
Popularity: 4679th place
This snippet is Public
Languagecsharp

ParaFunc.cs   -    Parallelized Functional stuffs : Sort, Search, Map, Reduce, MapReduce

It hardly blows anything up!   LoL   an experiment in functional programming
Copy Embed Code
<iframe id="embedFrame" style="width:600px; height:300px;"
src="https://www.snip2code.com/Embed/662636/ParaFunc-cs--------Parallelized-Function?startLine=0"></iframe>
Click on the embed code to copy it into your clipboard Width Height
Leave empty to retrieve all the content Start End
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ParaStuff { public class ParallelSort<T> { public static void ParaQuickSort (T[] data, IComparer<T> comp, int limit = 16, int minSize = 10000) { DoSort(data, 0, data.Length - 1, comp, 0, limit, minSize); } internal static void DoSort (T[] data, int start, int end, IComparer<T> comp, int curIdx, int limit, int minSize) { if (start < end) { if (curIdx > limit || end - start < minSize) { Array.Sort(data, start, end - start + 1, comp); } else { int pivotAt = Sector(data, start, end, comp); Task lTsk = Task.Factory.StartNew(( ) => { DoSort(data, start, pivotAt - 1, comp, curIdx + 1, limit, minSize); }); Task rTsk = Task.Factory.StartNew(( ) => { DoSort(data, pivotAt + 1, end, comp, curIdx + 1, limit, minSize); }); Task.WaitAll(lTsk, rTsk); } } } internal static void Switch (T[] data, int firstIdx, int secondIdx) { T pxy = data[firstIdx]; data[firstIdx] = data[secondIdx]; data[secondIdx] = pxy; } internal static int Sector (T[] data, int start, int end, IComparer<T> comp) { // get the pivot and put it at the end T pivot = data[start]; Switch(data, start, end); int saveAt = start; for (int i = start; i < end; i++) { if (comp.Compare(data[i], pivot) <= 0) { Switch(data, i, saveAt); saveAt++; } } Switch(data, saveAt, end); return saveAt; } } public class Graph<T> { public Graph<T> LeftNd, RightNd; public T Data; } public class GraphSeeker { public static void Seek<T> (Graph<T> grph, Action<T> actn) { if (grph != null) { actn.Invoke(grph.Data); if (grph.LeftNd != null && grph.RightNd != null) { Task lTsk = Task.Factory.StartNew(( ) => Seek(grph.LeftNd, actn)); Task rTsk = Task.Factory.StartNew(( ) => Seek(grph.RightNd, actn)); Task.WaitAll(lTsk, rTsk); } } } } public class GraphSearch { private static CancellationTokenSource tokSrc; public static T Search<T> (Graph<T> grph, Func<T, bool> searchFunc) { tokSrc = new CancellationTokenSource(); GraphWrap<T> rzlt = Search(grph, searchFunc, tokSrc); return rzlt == null ? default(T) : rzlt.Value; } public class GraphWrap<T> { public T Value; } private static GraphWrap<T> Search<T> (Graph<T> grph, Func<T, bool> searchFunc, CancellationTokenSource tokSrc) { GraphWrap<T> rzlt = null; if (grph != null) { if (searchFunc(grph.Data)) { tokSrc.Cancel(); rzlt = new GraphWrap<T>() { Value = grph.Data }; } else { if (grph.LeftNode != null && grph.RightNode != null) { Task<GraphWrap<T>> lTsk = Task<GraphWrap<T>>.Factory.StartNew(( ) => Search(grph.LeftNode, searchFunc, tokSrc), tokSrc.Token); Task<GraphWrap<T>> rTsk = Task<GraphWrap<T>>.Factory.StartNew(( ) => Search(grph.RightNode, searchFunc, tokSrc), tokSrc.Token); try { rzlt = lTsk.Result != null ? lTsk.Result : rTsk.Result != null ? rTsk.Result : null; } catch (Exception) { } } } } return rzlt; } } public class ParallelCache<TKey, TValue> { private ConcurrentDictionary<TKey, Lazy<TValue>> catalog; private Func<TKey, TValue> factory; public ParallelCache (Func<TKey, TValue> fac) { factory = fac; catalog = new ConcurrentDictionary<TKey, Lazy<TValue>>(); } public TValue GetCurrentVal (TKey k) { return catalog.GetOrAdd(k, new Lazy<TValue>(( ) => factory(k))).Value; } } public class ParaMap { public static TOutput[] PMap<TInput, TOutput> (Func<TInput, TOutput> mapFunc, TInput[] data) { return data.AsParallel().AsOrdered().Select(value => mapFunc(value)).ToArray(); } } public class ParaReduce { public static TValue PReduce<TValue> (TValue[] src, TValue seed, Func<TValue, TValue, TValue> reduceFunc) { return src.AsParallel().Aggregate(seed, (lRzlt, val) => reduceFunc(lRzlt, val), (oRzlt, lRzlt) => reduceFunc(oRzlt, lRzlt), oRzlt => oRzlt); } } public class ParaMapReduce { public static IEnumerable<TOutput> PMapReduce<TInput, TIntermediate, TKey, TOutput> (IEnumerable<TInput> src, Func<TInput, IEnumerable<TIntermediate>> mapFunc, Func<TIntermediate, TKey> groupFunc, Func<IGrouping<TKey, TIntermediate>, TOutput> reduceFunc) { return src.AsParallel().SelectMany(mapFunc).GroupBy(groupFunc).Select(reduceFunc); } } public static class SmartSelection { public static void Calculate<TInput, TOutput> (TInput value, Action<long, TOutput> callback, params Func<TInput, TOutput>[] functions) { int cnt= 0; Task.Factory.StartNew(( ) => { Parallel.ForEach(functions, (Func<TInput, TOutput> func, ParallelLoopState lpSt, long idx) => { TOutput rez = func(value); if (Interlocked.Increment(ref cnt) == 1) { lpSt.Stop(); callback(idx, rez); } }); }); } } public class SmartCache<TKey, TValue> { private ConcurrentDictionary<TKey, Lazy<TValue>> catalog; private BlockingCollection<TKey> roster; private Func<TKey, TKey[]> smartFunc; private Func<TKey, TValue> facFunc; public SmartCache (Func<TKey, TValue> factory, Func<TKey, TKey[]> spec) { smartFunc = spec; catalog = new ConcurrentDictionary<TKey, Lazy<TValue>>(); roster = new BlockingCollection<TKey>(); facFunc = (k => { TValue value = factory(k); roster.Add(k); return value; }); Task.Factory.StartNew(( ) => { Parallel.ForEach(roster.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 2 }, k => { // ya gotta enumerate to speculate! foreach (TKey spk in smartFunc(k)) { TValue res = catalog.GetOrAdd(spk, new Lazy<TValue>(( ) => factory(spk))).Value; } }); }); } public TValue GetCurrentVal (TKey k) { return catalog.GetOrAdd(k, new Lazy<TValue>(( ) => facFunc(k))).Value; } } } namespace ParaStuff.FeedBackWindow { // use this specialized console for any output etc. that you need public class FeedBackWindow { private static BlockingCollection<Action> bq; private static Task msgThd; static FeedBackWindow ( ) { bq = new BlockingCollection<Action>(); msgThd = Task.Factory.StartNew(( ) => { foreach (Action atn in bq.GetConsumingEnumerable()) { atn.Invoke(); } }, TaskCreationOptions.LongRunning); } public static void WriteLine (object val) { bq.Add(new Action(( ) => Console.WriteLine(val))); } public static void WriteLine (string fmt, params object[] vals) { bq.Add(new Action(( ) => Console.WriteLine(fmt, vals))); } } } namespace ParaStuff.Pipeline { private class Reaction { public TInput Value; public Action<TInput, TOutput> Callback; } public class Pipeline<TInput, TOutput> { private BlockingCollection<Reaction> valQ; Func<TInput, TOutput> pipelineFunc; public Pipeline (Func<TInput, TOutput> func) { pipelineFunc = func; } public Pipeline<TInput, TNewOutput> AddFunction<TNewOutput> (Func<TOutput, TNewOutput> newfunc) { Func<TInput, TNewOutput> frankenFunc = (dta => { return newfunc(pipelineFunc(dta)); }); return new Pipeline<TInput, TNewOutput>(frankenFunc); } public void AddVal (TInput val, Action<TInput, TOutput> callback) { valQ.Add(new Reaction { Value = val, Callback = callback }); } public void Begin ( ) { valQ = new BlockingCollection<Reaction>(); Task.Factory.StartNew(( ) => { Parallel.ForEach(valQ.GetConsumingEnumerable(), rpr => { rpr.Callback(rpr.Value, pipelineFunc(rpr.Value)); }); }); } public void End ( ) { valQ.CompleteAdding(); } } }
If you want to be updated about similar snippets, Sign in and follow our Channels

blog comments powered by Disqus