AsyncBatchUtil


    public static class AsyncBatchExample 
    {
        public static object udf_SlowFunction(string code, int value)
        {
            return BatchRunner.Run("udf_SlowFunction", code, value);
        }

        static readonly AsyncBatchUtil BatchRunner = new AsyncBatchUtil(1000, System.TimeSpan.FromMilliseconds(250), RunBatch); 

        static async System.Threading.Tasks.Task<
            System.Collections.Generic.List<object>>
            RunBatch(System.Collections.Generic.List<AsyncBatchUtil.AsyncCall> calls)
        {
            var batchStart = System.DateTime.Now;

            await System.Threading.Tasks.Task.Delay(System.TimeSpan.FromSeconds(10));

            using (var httpClient = new System.Net.Http.HttpClient())
            {
                var page = await httpClient.GetStringAsync("http://www.google.com");
            }

            var results = new System.Collections.Generic.List<object>();
            int i = 0;
            foreach (var call in calls)
            {
                var result = string.Format("{0} - {1} : {2}/{3} @ {4:HH:mm:ss.fff}", call.FunctionName, call.Arguments[0], i++, calls.Count, batchStart);
                results.Add(result);
            }

            return results;
        }

    }


public class AsyncBatchUtil 
    {
        public class AsyncCall
        {
            internal System.Threading.Tasks.TaskCompletionSource<object> TaskCompletionSource;
            public string FunctionName { get; private set; }
            public object[] Arguments { get; private set; }

            public AsyncCall(
                System.Threading.Tasks.TaskCompletionSource<object> taskCompletion,
                string functionName,
                object[] args)
            {
                TaskCompletionSource = taskCompletion;
                FunctionName = functionName;
                Arguments = args;
            }
        }

        readonly int _maxBatchSize; // not a hard limit

        readonly System.Func<System.Collections.Generic.List<AsyncCall>,
                 System.Threading.Tasks.Task<System.Collections.Generic.List<object>>> _batchRunner;

        readonly object _lock = new object();

        readonly System.Timers.Timer _batchTimer; // Timer events will fire from a ThreadPool thread

        System.Collections.Generic.List<AsyncCall> _currentBatch;

        public AsyncBatchUtil(
            int maxBatchSize,
            System.TimeSpan batchTimeout,
            System.Func<System.Collections.Generic.List<AsyncCall>,
                System.Threading.Tasks.Task<System.Collections.Generic.List<object>>> batchRunner)
        {
            if (maxBatchSize < 1)
            {
                throw new System.ArgumentOutOfRangeException("maxBatchSize", "Max batch size must be positive");
            }
            if (batchRunner == null)
            {
                // Check early - otherwise the NullReferenceException would happen in a threadpool callback.
                throw new System.ArgumentNullException("batchRunner");
            }

            _maxBatchSize = maxBatchSize;
            _batchRunner = batchRunner;

            _currentBatch = new System.Collections.Generic.List<AsyncCall>();

            _batchTimer = new System.Timers.Timer(batchTimeout.TotalMilliseconds);
            _batchTimer.AutoReset = false;
            _batchTimer.Elapsed += TimerElapsed;
            // Timer is not Enabled (Started) by default
        }

        // Will only run on the main thread
        public object Run(string functionName, params object[] args)
        {
            return ExcelDna.Integration.ExcelAsyncUtil.Observe(functionName, args, delegate
            {
                var tcs = new System.Threading.Tasks.TaskCompletionSource<object>();

                EnqueueAsyncCall(tcs, functionName, args);

                return new TaskExcelObservable(tcs.Task);
            });
        }

        // Will only run on the main thread
        void EnqueueAsyncCall(
            System.Threading.Tasks.TaskCompletionSource<object> taskCompletion,
            string functionName,
            object[] args)
        {
            lock (_lock)
            {
                _currentBatch.Add(new AsyncCall(taskCompletion, functionName, args));

                // Check if the batch size has been reached, schedule it to be run
                if (_currentBatch.Count >= _maxBatchSize)
                {
                    // This won't run the batch immediately, but will ensure that the current batch (containing this call) will run soon.
                    System.Threading.ThreadPool.QueueUserWorkItem(
                        state => RunBatch((System.Collections.Generic.List<AsyncCall>)state), _currentBatch);

                    _currentBatch = new System.Collections.Generic.List<AsyncCall>();

                    _batchTimer.Stop();
                }
                else
                {
                    // We don't know if the batch containing the current call will run,
                    // so ensure that a timer is started.
                    if (!_batchTimer.Enabled)
                    {
                        _batchTimer.Start();
                    }
                }
            }
        }

        // Will run on a ThreadPool thread
        void TimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            System.Collections.Generic.List<AsyncCall> batch;
            lock (_lock)
            {
                batch = _currentBatch;
                _currentBatch = new System.Collections.Generic.List<AsyncCall>();
            }
            RunBatch(batch);
        }


        // Will always run on a ThreadPool thread
        // Might be re-entered...
        // batch is allowed to be empty
        async void RunBatch(System.Collections.Generic.List<AsyncCall> batch)
        {
            // Maybe due to Timer re-entrancy we got an empty batch...?
            if (batch.Count == 0)
            {
                // No problem - just return
                return;
            }

            try
            {
                var resultList = await _batchRunner(batch);
                if (resultList.Count != batch.Count)
                {
                    throw new System.InvalidOperationException(string.Format("Batch result size incorrect. Batch Count: {0}, Result Count: {1}", batch.Count, resultList.Count));
                }

                for (int i = 0; i < resultList.Count; i++)
                {
                    batch[i].TaskCompletionSource.SetResult(resultList[i]);
                }
            }
            catch (System.Exception ex)
            {
                foreach (var call in batch)
                {
                    call.TaskCompletionSource.SetException(ex);
                }
            }
        }

        // Helper class to turn a task into an IExcelObservable that either returns the task result and completes, or pushes an Exception
        class TaskExcelObservable : ExcelDna.Integration.IExcelObservable
        {
            readonly System.Threading.Tasks.Task<object> _task;

            public TaskExcelObservable(System.Threading.Tasks.Task<object> task)
            {
                _task = task;
            }

            public System.IDisposable Subscribe(ExcelDna.Integration.IExcelObserver observer)
            {
                switch (_task.Status)
                {
                    case System.Threading.Tasks.TaskStatus.RanToCompletion:
                        observer.OnNext(_task.Result);
                        observer.OnCompleted();
                        break;

                    case System.Threading.Tasks.TaskStatus.Faulted:
                        observer.OnError(_task.Exception.InnerException);
                        break;

                    case System.Threading.Tasks.TaskStatus.Canceled:
                        observer.OnError(new System.Threading.Tasks.TaskCanceledException(_task));
                        break;

                    default:
                        var task = _task;
                        // OK - the Task has not completed synchronously
                        // And handle the Task completion
                        task.ContinueWith(t =>
                        {
                            switch (t.Status)
                            {
                                case System.Threading.Tasks.TaskStatus.RanToCompletion:
                                    observer.OnNext(t.Result);
                                    observer.OnCompleted();
                                    break;

                                case System.Threading.Tasks.TaskStatus.Faulted:
                                    observer.OnError(t.Exception.InnerException);
                                    break;

                                case System.Threading.Tasks.TaskStatus.Canceled:
                                    observer.OnError(new System.Threading.Tasks.TaskCanceledException(t));
                                    break;
                            }
                        });
                        break;
                }

                return DefaultDisposable.Instance;
            }

            // Helper class to make an empty IDisposable
            sealed class DefaultDisposable : System.IDisposable
            {
                public static readonly DefaultDisposable Instance = new DefaultDisposable();
                // Prevent external instantiation
                DefaultDisposable() { }
                public void Dispose() { }
            }
        }
    }


© 2024 Better Solutions Limited. All Rights Reserved. © 2024 Better Solutions Limited TopPrevNext