AsyncBatchUtil
public static class AsyncBatchExample
{
public static object udf_SlowFunction(string code, int value)
{
return BatchRunner.Run("udf_SlowFunction", code, value);
}
static readonly AsyncBatchUtil BatchRunner = new AsyncBatchUtil(1000, System.TimeSpan.FromMilliseconds(250), RunBatch);
static async System.Threading.Tasks.Task<
System.Collections.Generic.List<object>>
RunBatch(System.Collections.Generic.List<AsyncBatchUtil.AsyncCall> calls)
{
var batchStart = System.DateTime.Now;
await System.Threading.Tasks.Task.Delay(System.TimeSpan.FromSeconds(10));
using (var httpClient = new System.Net.Http.HttpClient())
{
var page = await httpClient.GetStringAsync("http://www.google.com");
}
var results = new System.Collections.Generic.List<object>();
int i = 0;
foreach (var call in calls)
{
var result = string.Format("{0} - {1} : {2}/{3} @ {4:HH:mm:ss.fff}", call.FunctionName, call.Arguments[0], i++, calls.Count, batchStart);
results.Add(result);
}
return results;
}
}
public class AsyncBatchUtil
{
public class AsyncCall
{
internal System.Threading.Tasks.TaskCompletionSource<object> TaskCompletionSource;
public string FunctionName { get; private set; }
public object[] Arguments { get; private set; }
public AsyncCall(
System.Threading.Tasks.TaskCompletionSource<object> taskCompletion,
string functionName,
object[] args)
{
TaskCompletionSource = taskCompletion;
FunctionName = functionName;
Arguments = args;
}
}
readonly int _maxBatchSize; // not a hard limit
readonly System.Func<System.Collections.Generic.List<AsyncCall>,
System.Threading.Tasks.Task<System.Collections.Generic.List<object>>> _batchRunner;
readonly object _lock = new object();
readonly System.Timers.Timer _batchTimer; // Timer events will fire from a ThreadPool thread
System.Collections.Generic.List<AsyncCall> _currentBatch;
public AsyncBatchUtil(
int maxBatchSize,
System.TimeSpan batchTimeout,
System.Func<System.Collections.Generic.List<AsyncCall>,
System.Threading.Tasks.Task<System.Collections.Generic.List<object>>> batchRunner)
{
if (maxBatchSize < 1)
{
throw new System.ArgumentOutOfRangeException("maxBatchSize", "Max batch size must be positive");
}
if (batchRunner == null)
{
// Check early - otherwise the NullReferenceException would happen in a threadpool callback.
throw new System.ArgumentNullException("batchRunner");
}
_maxBatchSize = maxBatchSize;
_batchRunner = batchRunner;
_currentBatch = new System.Collections.Generic.List<AsyncCall>();
_batchTimer = new System.Timers.Timer(batchTimeout.TotalMilliseconds);
_batchTimer.AutoReset = false;
_batchTimer.Elapsed += TimerElapsed;
// Timer is not Enabled (Started) by default
}
// Will only run on the main thread
public object Run(string functionName, params object[] args)
{
return ExcelDna.Integration.ExcelAsyncUtil.Observe(functionName, args, delegate
{
var tcs = new System.Threading.Tasks.TaskCompletionSource<object>();
EnqueueAsyncCall(tcs, functionName, args);
return new TaskExcelObservable(tcs.Task);
});
}
// Will only run on the main thread
void EnqueueAsyncCall(
System.Threading.Tasks.TaskCompletionSource<object> taskCompletion,
string functionName,
object[] args)
{
lock (_lock)
{
_currentBatch.Add(new AsyncCall(taskCompletion, functionName, args));
// Check if the batch size has been reached, schedule it to be run
if (_currentBatch.Count >= _maxBatchSize)
{
// This won't run the batch immediately, but will ensure that the current batch (containing this call) will run soon.
System.Threading.ThreadPool.QueueUserWorkItem(
state => RunBatch((System.Collections.Generic.List<AsyncCall>)state), _currentBatch);
_currentBatch = new System.Collections.Generic.List<AsyncCall>();
_batchTimer.Stop();
}
else
{
// We don't know if the batch containing the current call will run,
// so ensure that a timer is started.
if (!_batchTimer.Enabled)
{
_batchTimer.Start();
}
}
}
}
// Will run on a ThreadPool thread
void TimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
{
System.Collections.Generic.List<AsyncCall> batch;
lock (_lock)
{
batch = _currentBatch;
_currentBatch = new System.Collections.Generic.List<AsyncCall>();
}
RunBatch(batch);
}
// Will always run on a ThreadPool thread
// Might be re-entered...
// batch is allowed to be empty
async void RunBatch(System.Collections.Generic.List<AsyncCall> batch)
{
// Maybe due to Timer re-entrancy we got an empty batch...?
if (batch.Count == 0)
{
// No problem - just return
return;
}
try
{
var resultList = await _batchRunner(batch);
if (resultList.Count != batch.Count)
{
throw new System.InvalidOperationException(string.Format("Batch result size incorrect. Batch Count: {0}, Result Count: {1}", batch.Count, resultList.Count));
}
for (int i = 0; i < resultList.Count; i++)
{
batch[i].TaskCompletionSource.SetResult(resultList[i]);
}
}
catch (System.Exception ex)
{
foreach (var call in batch)
{
call.TaskCompletionSource.SetException(ex);
}
}
}
// Helper class to turn a task into an IExcelObservable that either returns the task result and completes, or pushes an Exception
class TaskExcelObservable : ExcelDna.Integration.IExcelObservable
{
readonly System.Threading.Tasks.Task<object> _task;
public TaskExcelObservable(System.Threading.Tasks.Task<object> task)
{
_task = task;
}
public System.IDisposable Subscribe(ExcelDna.Integration.IExcelObserver observer)
{
switch (_task.Status)
{
case System.Threading.Tasks.TaskStatus.RanToCompletion:
observer.OnNext(_task.Result);
observer.OnCompleted();
break;
case System.Threading.Tasks.TaskStatus.Faulted:
observer.OnError(_task.Exception.InnerException);
break;
case System.Threading.Tasks.TaskStatus.Canceled:
observer.OnError(new System.Threading.Tasks.TaskCanceledException(_task));
break;
default:
var task = _task;
// OK - the Task has not completed synchronously
// And handle the Task completion
task.ContinueWith(t =>
{
switch (t.Status)
{
case System.Threading.Tasks.TaskStatus.RanToCompletion:
observer.OnNext(t.Result);
observer.OnCompleted();
break;
case System.Threading.Tasks.TaskStatus.Faulted:
observer.OnError(t.Exception.InnerException);
break;
case System.Threading.Tasks.TaskStatus.Canceled:
observer.OnError(new System.Threading.Tasks.TaskCanceledException(t));
break;
}
});
break;
}
return DefaultDisposable.Instance;
}
// Helper class to make an empty IDisposable
sealed class DefaultDisposable : System.IDisposable
{
public static readonly DefaultDisposable Instance = new DefaultDisposable();
// Prevent external instantiation
DefaultDisposable() { }
public void Dispose() { }
}
}
}
© 2024 Better Solutions Limited. All Rights Reserved. © 2024 Better Solutions Limited TopPrevNext