
    public static class AsyncBatchExample 
        public static object udf_SlowFunction(string code, int value)
            return BatchRunner.Run("udf_SlowFunction", code, value);

        static readonly AsyncBatchUtil BatchRunner = new AsyncBatchUtil(1000, System.TimeSpan.FromMilliseconds(250), RunBatch); 

        static async System.Threading.Tasks.Task<
            RunBatch(System.Collections.Generic.List<AsyncBatchUtil.AsyncCall> calls)
            var batchStart = System.DateTime.Now;

            await System.Threading.Tasks.Task.Delay(System.TimeSpan.FromSeconds(10));

            using (var httpClient = new System.Net.Http.HttpClient())
                var page = await httpClient.GetStringAsync("");

            var results = new System.Collections.Generic.List<object>();
            int i = 0;
            foreach (var call in calls)
                var result = string.Format("{0} - {1} : {2}/{3} @ {4:HH:mm:ss.fff}", call.FunctionName, call.Arguments[0], i++, calls.Count, batchStart);

            return results;


public class AsyncBatchUtil 
        public class AsyncCall
            internal System.Threading.Tasks.TaskCompletionSource<object> TaskCompletionSource;
            public string FunctionName { get; private set; }
            public object[] Arguments { get; private set; }

            public AsyncCall(
                System.Threading.Tasks.TaskCompletionSource<object> taskCompletion,
                string functionName,
                object[] args)
                TaskCompletionSource = taskCompletion;
                FunctionName = functionName;
                Arguments = args;

        readonly int _maxBatchSize; // not a hard limit

        readonly System.Func<System.Collections.Generic.List<AsyncCall>,
                 System.Threading.Tasks.Task<System.Collections.Generic.List<object>>> _batchRunner;

        readonly object _lock = new object();

        readonly System.Timers.Timer _batchTimer; // Timer events will fire from a ThreadPool thread

        System.Collections.Generic.List<AsyncCall> _currentBatch;

        public AsyncBatchUtil(
            int maxBatchSize,
            System.TimeSpan batchTimeout,
                System.Threading.Tasks.Task<System.Collections.Generic.List<object>>> batchRunner)
            if (maxBatchSize < 1)
                throw new System.ArgumentOutOfRangeException("maxBatchSize", "Max batch size must be positive");
            if (batchRunner == null)
                // Check early - otherwise the NullReferenceException would happen in a threadpool callback.
                throw new System.ArgumentNullException("batchRunner");

            _maxBatchSize = maxBatchSize;
            _batchRunner = batchRunner;

            _currentBatch = new System.Collections.Generic.List<AsyncCall>();

            _batchTimer = new System.Timers.Timer(batchTimeout.TotalMilliseconds);
            _batchTimer.AutoReset = false;
            _batchTimer.Elapsed += TimerElapsed;
            // Timer is not Enabled (Started) by default

        // Will only run on the main thread
        public object Run(string functionName, params object[] args)
            return ExcelDna.Integration.ExcelAsyncUtil.Observe(functionName, args, delegate
                var tcs = new System.Threading.Tasks.TaskCompletionSource<object>();

                EnqueueAsyncCall(tcs, functionName, args);

                return new TaskExcelObservable(tcs.Task);

        // Will only run on the main thread
        void EnqueueAsyncCall(
            System.Threading.Tasks.TaskCompletionSource<object> taskCompletion,
            string functionName,
            object[] args)
            lock (_lock)
                _currentBatch.Add(new AsyncCall(taskCompletion, functionName, args));

                // Check if the batch size has been reached, schedule it to be run
                if (_currentBatch.Count >= _maxBatchSize)
                    // This won't run the batch immediately, but will ensure that the current batch (containing this call) will run soon.
                        state => RunBatch((System.Collections.Generic.List<AsyncCall>)state), _currentBatch);

                    _currentBatch = new System.Collections.Generic.List<AsyncCall>();

                    // We don't know if the batch containing the current call will run,
                    // so ensure that a timer is started.
                    if (!_batchTimer.Enabled)

        // Will run on a ThreadPool thread
        void TimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
            System.Collections.Generic.List<AsyncCall> batch;
            lock (_lock)
                batch = _currentBatch;
                _currentBatch = new System.Collections.Generic.List<AsyncCall>();

        // Will always run on a ThreadPool thread
        // Might be re-entered...
        // batch is allowed to be empty
        async void RunBatch(System.Collections.Generic.List<AsyncCall> batch)
            // Maybe due to Timer re-entrancy we got an empty batch...?
            if (batch.Count == 0)
                // No problem - just return

                var resultList = await _batchRunner(batch);
                if (resultList.Count != batch.Count)
                    throw new System.InvalidOperationException(string.Format("Batch result size incorrect. Batch Count: {0}, Result Count: {1}", batch.Count, resultList.Count));

                for (int i = 0; i < resultList.Count; i++)
            catch (System.Exception ex)
                foreach (var call in batch)

        // Helper class to turn a task into an IExcelObservable that either returns the task result and completes, or pushes an Exception
        class TaskExcelObservable : ExcelDna.Integration.IExcelObservable
            readonly System.Threading.Tasks.Task<object> _task;

            public TaskExcelObservable(System.Threading.Tasks.Task<object> task)
                _task = task;

            public System.IDisposable Subscribe(ExcelDna.Integration.IExcelObserver observer)
                switch (_task.Status)
                    case System.Threading.Tasks.TaskStatus.RanToCompletion:

                    case System.Threading.Tasks.TaskStatus.Faulted:

                    case System.Threading.Tasks.TaskStatus.Canceled:
                        observer.OnError(new System.Threading.Tasks.TaskCanceledException(_task));

                        var task = _task;
                        // OK - the Task has not completed synchronously
                        // And handle the Task completion
                        task.ContinueWith(t =>
                            switch (t.Status)
                                case System.Threading.Tasks.TaskStatus.RanToCompletion:

                                case System.Threading.Tasks.TaskStatus.Faulted:

                                case System.Threading.Tasks.TaskStatus.Canceled:
                                    observer.OnError(new System.Threading.Tasks.TaskCanceledException(t));

                return DefaultDisposable.Instance;

            // Helper class to make an empty IDisposable
            sealed class DefaultDisposable : System.IDisposable
                public static readonly DefaultDisposable Instance = new DefaultDisposable();
                // Prevent external instantiation
                DefaultDisposable() { }
                public void Dispose() { }

© 2024 Better Solutions Limited. All Rights Reserved. © 2024 Better Solutions Limited TopPrevNext