asyncが言わざるを得ないこと:SynchronizationContext
23494 ワード
書き込み待ち
ASP.NET環境:
ASP.NET環境:
internal sealed class AspNetSynchronizationContext : AspNetSynchronizationContextBase {
// we move all of the state to a separate field since our CreateCopy() method needs shallow copy semantics
private readonly State _state;
internal AspNetSynchronizationContext(ISyncContext syncContext)
: this(new State(new SynchronizationHelper(syncContext))) {
}
private AspNetSynchronizationContext(State state) {
_state = state;
}
internal override bool AllowAsyncDuringSyncStages {
get {
return _state.AllowAsyncDuringSyncStages;
}
set {
_state.AllowAsyncDuringSyncStages = value;
}
}
// We can't ever truly disable the AspNetSynchronizationContext, as the user and runtime can kick off asynchronous
// operations whether we wanted them to or not. But this property can be used as a flag by Page and other types
// to signal that asynchronous operations are not currently valid, so at least ASP.NET can avoid kicking them
// off and can bubble an appropriate exception back to the developer.
internal override bool Enabled {
get { return _state.Enabled; }
}
internal override ExceptionDispatchInfo ExceptionDispatchInfo {
get { return _state.Helper.Error; }
}
internal override int PendingOperationsCount {
get { return _state.Helper.PendingCount; }
}
internal override void AllowVoidAsyncOperations() {
_state.AllowVoidAsyncOperations = true;
}
[SuppressMessage("Microsoft.Globalization", "CA1305:SpecifyIFormatProvider", Justification = "Used only during debug.")]
internal override void AssociateWithCurrentThread() {
IDisposable disassociationAction = _state.Helper.EnterSynchronousControl();
#if DBG
IDisposable capturedDisassociationAction = disassociationAction;
Thread capturedThread = Thread.CurrentThread;
disassociationAction = new DisposableAction(() => {
Debug.Assert(capturedThread == Thread.CurrentThread, String.Format("AssociateWithCurrentThread was called on thread ID '{0}', but DisassociateFromCurrentThread was called on thread ID '{1}'.", capturedThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId));
capturedDisassociationAction.Dispose();
});
#endif
// Don't need to synchronize access to SyncControlDisassociationActions since only one thread can call
// EnterSynchronousControl() at a time.
_state.SyncControlDisassociationActions.Push(disassociationAction);
}
internal override void ClearError() {
_state.Helper.Error = null;
}
// Called by the BCL when it needs a SynchronizationContext that is identical to the existing context
// but does not have referential equality.
public override SynchronizationContext CreateCopy() {
return new AspNetSynchronizationContext(_state);
}
internal override void Disable() {
_state.Enabled = false;
}
internal override void DisassociateFromCurrentThread() {
// Don't need to synchronize access to SyncControlDisassociationActions since we assume that our callers are
// well-behaved and won't call DisassociateFromCurrentThread() on a thread other than the one which called
// AssociateWithCurrentThread(), which itself serializes access.
Debug.Assert(_state.SyncControlDisassociationActions.Count > 0, "DisassociateFromCurrentThread() was called on a thread which hadn't previously called AssociateWithCurrentThread().");
IDisposable disassociationAction = _state.SyncControlDisassociationActions.Pop();
disassociationAction.Dispose();
}
internal override void Enable() {
_state.Enabled = true;
}
public override void OperationCompleted() {
Interlocked.Decrement(ref _state.VoidAsyncOutstandingOperationCount); // this line goes first since ChangeOperationCount might invoke a callback which depends on this value
_state.Helper.ChangeOperationCount(-1);
}
public override void OperationStarted() {
// If the caller tries to kick off an asynchronous operation while we are not
// processing an async module, handler, or Page, we should prohibit the operation.
if (!AllowAsyncDuringSyncStages && !_state.AllowVoidAsyncOperations) {
InvalidOperationException ex = new InvalidOperationException(SR.GetString(SR.Async_operation_cannot_be_started));
throw ex;
}
_state.Helper.ChangeOperationCount(+1);
Interlocked.Increment(ref _state.VoidAsyncOutstandingOperationCount);
}
// Dev11 Bug 70908: Race condition involving SynchronizationContext allows ASP.NET requests to be abandoned in the pipeline
//
// When the last completion occurs, the _pendingCount is decremented and then the _lastCompletionCallbackLock is acquired to get
// the _lastCompletionCallback. If the _lastCompletionCallback is non-null, then the last completion will invoke the callback;
// otherwise, the caller of PendingCompletion will handle the completion.
internal override bool PendingCompletion(WaitCallback callback) {
return _state.Helper.TrySetCompletionContinuation(() => callback(null));
}
public override void Post(SendOrPostCallback callback, Object state) {
_state.Helper.QueueAsynchronous(() => callback(state));
}
// The method is used to post async func.
internal void PostAsync(Func<object, Task> callback, Object state) {
_state.Helper.QueueAsynchronousAsync(callback, state);
}
internal override void ProhibitVoidAsyncOperations() {
_state.AllowVoidAsyncOperations = false;
// If the caller tries to prohibit async operations while there are still some
// outstanding, we should treat this as an error condition. We can't throw from
// this method since (a) the caller generally isn't prepared for it and (b) we
// need to wait for the outstanding operations to finish anyway, so we instead
// need to mark the helper as faulted.
//
// There is technically a race condition here: the caller isn't guaranteed to
// observe the error if the operation counter hits zero at just the right time.
// But it's actually not terrible if that happens, since the error is really
// just meant to be used for diagnostic purposes.
if (!AllowAsyncDuringSyncStages && Volatile.Read(ref _state.VoidAsyncOutstandingOperationCount) > 0) {
InvalidOperationException ex = new InvalidOperationException(SR.GetString(SR.Async_operation_cannot_be_pending));
_state.Helper.Error = ExceptionDispatchInfo.Capture(ex);
}
}
internal override void ResetSyncCaller() {
// no-op
// this type doesn't special-case asynchronous work kicked off from a synchronous handler
}
internal override void SetSyncCaller() {
// no-op
// this type doesn't special-case asynchronous work kicked off from a synchronous handler
}
public override void Send(SendOrPostCallback callback, Object state) {
_state.Helper.QueueSynchronous(() => callback(state));
}
private sealed class State {
internal bool AllowAsyncDuringSyncStages = AppSettings.AllowAsyncDuringSyncStages;
internal volatile bool AllowVoidAsyncOperations = false;
internal bool Enabled = true;
internal readonly SynchronizationHelper Helper; // handles scheduling of the asynchronous tasks
internal Stack<IDisposable> SyncControlDisassociationActions = new Stack<IDisposable>(capacity: 1);
internal int VoidAsyncOutstandingOperationCount = 0;
internal State(SynchronizationHelper helper) {
Helper = helper;
}
}
}
internal sealed class SynchronizationHelper {
private Task _completionTask; // the Task that will run when all in-flight operations have completed
private Thread _currentThread; // the Thread that's running the current Task; all threads must see the same value for this field
private Task _lastScheduledTask = CreateInitialTask(); // the last Task that was queued to this helper, used to hook future Tasks (not volatile since always accessed under lock)
private Task _lastScheduledTaskAsync = CreateInitialTask(); // the last async Task that was queued to this helper
private readonly object _lockObj = new object(); // synchronizes access to _lastScheduledTask
private int _operationsInFlight; // operation counter
private readonly ISyncContext _syncContext; // a context that wraps an operation with pre- and post-execution phases
private readonly Action<bool> _appVerifierCallback; // for making sure that developers don't try calling us after the request has completed
public SynchronizationHelper(ISyncContext syncContext) {
_syncContext = syncContext;
_appVerifierCallback = AppVerifier.GetSyncContextCheckDelegate(syncContext);
}
// If an operation results in an exception, this property will provide access to it.
public ExceptionDispatchInfo Error { get; set; }
// Helper to access the _currentThread field in a thread-safe fashion.
// It is not enough to mark the _currentThread field volatile, since that only guarantees
// read / write ordering and doesn't ensure that each thread sees the same value.
private Thread CurrentThread {
get { return Interlocked.CompareExchange(ref _currentThread, null, null); }
set { Interlocked.Exchange(ref _currentThread, value); }
}
// Returns the number of pending operations
public int PendingCount { get { return ChangeOperationCount(0); } }
public int ChangeOperationCount(int addend) {
int newOperationCount = Interlocked.Add(ref _operationsInFlight, addend);
if (newOperationCount == 0) {
// if an asynchronous completion operation is queued, run it
Task completionTask = Interlocked.Exchange(ref _completionTask, null);
if (completionTask != null) {
completionTask.Start();
}
}
return newOperationCount;
}
private void CheckForRequestStateIfRequired(bool checkForReEntry) {
if (_appVerifierCallback != null) {
_appVerifierCallback(checkForReEntry);
}
}
// Creates the initial hook that future operations can ride off of
private static Task CreateInitialTask() {
return Task.FromResult<object>(null);
}
// Takes control of this SynchronizationHelper instance synchronously. Asynchronous operations
// will be queued but will not be dispatched until control is released (by disposing of the
// returned object). This operation might block if a different thread is currently in
// control of the context.
public IDisposable EnterSynchronousControl() {
if (CurrentThread == Thread.CurrentThread) {
// If the current thread already has control of this context, there's nothing extra to do.
return DisposableAction.Empty;
}
// used to mark the end of the synchronous task
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
Task lastTask;
lock (_lockObj) {
lastTask = _lastScheduledTask;
_lastScheduledTask = tcs.Task; // future work can be scheduled off this Task
}
// The original task may end up Faulted, which would make its Wait() method throw an exception.
// To avoid this, we instead wait on a continuation which is always guaranteed to complete successfully.
if (!lastTask.IsCompleted) { lastTask.ContinueWith(_ => { }, TaskContinuationOptions.ExecuteSynchronously).Wait(); }
CurrentThread = Thread.CurrentThread;
// synchronous control is released by marking the Task as complete
return new DisposableAction(() => {
CurrentThread = null;
tcs.TrySetResult(null);
});
}
public void QueueAsynchronous(Action action) {
CheckForRequestStateIfRequired(checkForReEntry: true);
ChangeOperationCount(+1);
// This method only schedules work; it doesn't itself do any work. The lock is held for a very
// short period of time.
lock (_lockObj) {
Task newTask = _lastScheduledTask.ContinueWith(_ => SafeWrapCallback(action));
_lastScheduledTask = newTask; // the newly-created task is now the last one
}
}
// QueueAsynchronousAsync and SafeWrapCallbackAsync guarantee:
// 1. For funcs posted here, it's would first come, first complete.
// 2. There is no overlapping execution.
public void QueueAsynchronousAsync(Func<object, Task> func, object state) {
CheckForRequestStateIfRequired(checkForReEntry: true);
ChangeOperationCount(+1);
// This method only schedules work; it doesn't itself do any work. The lock is held for a very
// short period of time.
lock (_lockObj) {
// 1. Note that we are chaining newTask with _lastScheduledTaskAsync, not _lastScheduledTask.
// Chaining newTask with _lastScheduledTask would cause deadlock.
// 2. Unwrap() is necessary to be called here. When chaining multiple tasks using the ContinueWith
// method, your return type will be Task<T> whereas T is the return type of the delegate/method
// passed to ContinueWith. As the return type of an async delegate is a Task, you will end up with
// a Task<Task> and end up waiting for the async delegate to return you the Task which is done after
// the first await.
Task newTask = _lastScheduledTaskAsync.ContinueWith(
async _ => { await SafeWrapCallbackAsync(func, state); }).Unwrap();
_lastScheduledTaskAsync = newTask; // the newly-created task is now the last one
}
}
public void QueueSynchronous(Action action) {
CheckForRequestStateIfRequired(checkForReEntry: false);
if (CurrentThread == Thread.CurrentThread) {
// current thread already owns the context, so just execute inline to prevent deadlocks
action();
return;
}
ChangeOperationCount(+1);
using (EnterSynchronousControl()) {
SafeWrapCallback(action);
}
}
private void SafeWrapCallback(Action action) {
// This method will try to catch exceptions so that they don't bubble up to our
// callers. However, ThreadAbortExceptions will continue to bubble up.
try {
CurrentThread = Thread.CurrentThread;
ISyncContextLock syncContextLock = null;
try {
syncContextLock = (_syncContext != null) ? _syncContext.Enter() : null;
try {
action();
}
catch (Exception ex) {
Error = ExceptionDispatchInfo.Capture(ex);
}
}
finally {
if (syncContextLock != null) {
syncContextLock.Leave();
}
}
}
finally {
CurrentThread = null;
ChangeOperationCount(-1);
}
}
// This method does not run the func by itself. It simply queues the func into the existing
// syncContext queue.
private async Task SafeWrapCallbackAsync(Func<object, Task> func, object state) {
try {
TaskCompletionSource<Task> tcs = new TaskCompletionSource<Task>();
QueueAsynchronous(() => {
var t = func(state);
t.ContinueWith((_) => {
if (t.IsFaulted) {
tcs.TrySetException(t.Exception.InnerExceptions);
}
else if (t.IsCanceled) {
tcs.TrySetCanceled();
}
else {
tcs.TrySetResult(t);
}
}, TaskContinuationOptions.ExecuteSynchronously);
});
await tcs.Task;
}
catch (Exception ex) {
Error = ExceptionDispatchInfo.Capture(ex);
}
finally {
ChangeOperationCount(-1);
}
}
// Sets the continuation that will asynchronously execute when the pending operation counter
// hits zero. Returns true if asynchronous execution is expected, false if the operation
// counter is already at zero and the caller should run the continuation inline.
public bool TrySetCompletionContinuation(Action continuation) {
int newOperationCount = ChangeOperationCount(+1); // prevent the operation counter from hitting zero while we're setting the field
bool scheduledAsynchronously = (newOperationCount > 1);
if (scheduledAsynchronously) {
Interlocked.Exchange(ref _completionTask, new Task(continuation));
}
ChangeOperationCount(-1);
return scheduledAsynchronously;
}
}
internal abstract class AspNetSynchronizationContextBase : SynchronizationContext {
private AllowAsyncOperationsBlockDisposable _allowAsyncOperationsBlockDisposable;
internal abstract bool AllowAsyncDuringSyncStages { get; set; }
internal abstract bool Enabled { get; }
internal Exception Error {
get {
ExceptionDispatchInfo dispatchInfo = ExceptionDispatchInfo;
return (dispatchInfo != null) ? dispatchInfo.SourceException : null;
}
}
internal abstract ExceptionDispatchInfo ExceptionDispatchInfo { get; }
internal abstract int PendingOperationsCount { get; }
internal abstract void ClearError();
internal abstract void Disable();
internal abstract void Enable();
internal abstract bool PendingCompletion(WaitCallback callback);
// A helper method which provides a Task-based wrapper around the PendingCompletion method.
// NOTE: The caller should verify that there are never outstanding calls to PendingCompletion
// or to WaitForPendingOperationsAsync, since each call replaces the continuation that will
// be invoked.
internal Task WaitForPendingOperationsAsync() {
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
WaitCallback callback = _ => {
Exception ex = Error;
if (ex != null) {
// We're going to observe the exception in the returned Task. We shouldn't keep
// it around in the SynchronizationContext or it will fault future Tasks.
ClearError();
tcs.TrySetException(ex);
}
else {
tcs.TrySetResult(null);
}
};
if (!PendingCompletion(callback)) {
// If PendingCompletion returns false, there are no pending operations and the
// callback will not be invoked, so we should just signal the TCS immediately.
callback(null);
}
return tcs.Task;
}
// These methods are used in the synchronous handler execution step so that a synchronous IHttpHandler
// can call asynchronous methods without locking on the HttpApplication instance (possibly causing
// deadlocks).
internal abstract void SetSyncCaller();
internal abstract void ResetSyncCaller();
// These methods are used for synchronization, e.g. to create a lock that is tied to the current
// thread. The legacy implementation locks on the HttpApplication instance, for example.
internal abstract void AssociateWithCurrentThread();
internal abstract void DisassociateFromCurrentThread();
// These methods are used for telling the synchronization context when it is legal for an application
// to kick off async void methods. They are used by the "AllowAsyncDuringSyncStages" setting to
// determine whether kicking off an operation should throw.
internal virtual void AllowVoidAsyncOperations() { /* no-op by default */ }
internal virtual void ProhibitVoidAsyncOperations() { /* no-op by default */ }
// helper method for wrapping AllowVoidAsyncOperations / ProhibitVoidAsyncOperations in a using block
internal IDisposable AllowVoidAsyncOperationsBlock() {
if (_allowAsyncOperationsBlockDisposable == null) {
_allowAsyncOperationsBlockDisposable = new AllowAsyncOperationsBlockDisposable(this);
}
AllowVoidAsyncOperations();
return _allowAsyncOperationsBlockDisposable;
}
// Helper method to wrap Associate / Disassociate calls in a using() statement
internal IDisposable AcquireThreadLock() {
AssociateWithCurrentThread();
return new DisposableAction(DisassociateFromCurrentThread);
}
private sealed class AllowAsyncOperationsBlockDisposable : IDisposable {
private readonly AspNetSynchronizationContextBase _syncContext;
public AllowAsyncOperationsBlockDisposable(AspNetSynchronizationContextBase syncContext) {
_syncContext = syncContext;
}
public void Dispose() {
_syncContext.ProhibitVoidAsyncOperations();
}
}
}