Being that I love using the .NET asynchronous paradigm to almost completely forget about threading (though it is there, and precautions need to be taken to avoid livelock and deadlock), I have implemented a few classes which implement the IAsyncResult interface. These implementations are based on the IAsyncResult implementation written by Jeffrey Richter in Concurrent Affairs in March 2007 in his article Implementing the CLR Asynchronous Programming Model
Before getting into the implementation, here is the inspiration behind the implementation. When dealing with socket Send and Receive it is possible for both functions to not process a complete message when called, however both functions return the number of bytes they processed. Usually in my applications, I find it useful to implement a function that looks like the one in Figure 1.
private void ReadBuffer(Socket socket, byte [] buffer, int offset, int length, SocketFlags flags)
{
while(length > 0)
{
int bytesRead = socket.Receive(buffer, offset, length, flags);
if( bytesRead == 0)
{
throw new Exception("Remote server closed the connection unexpectedly");
}
offset += bytesRead;
length -= bytesRead;
}
}
Figure 1: Synchronous operation that usually ends up needing to be asynchronous
Since working with the Asynchronous APIs, I’ve found that the same function is still useful, and needed to convert it to the asynchronous IAsyncResult paradigm. In doing so, I needed to convert the method in Figure 1 to have a signature like:
IAsyncResult BeginReadBuffer(Socket socket, byte [] buffer, int offset, int length, SocketFlags flags, AsyncCallback asyncCallback, Object asyncState);
void EndReadBuffer(IAsyncResult asyncResult);
Figure 2: Asynchronous signatures for the method in Figure 1.
I could implement the asynchronous methods by faking it, i.e. create a delegate the matches the signature of ReadBuffer and call its BeginInvoke and EndInvoke methods, however the purpose of using the asynchronous API in the first place was to reduce the total number of threads, and number of threads waiting. So with that said, I needed to implement IAsyncResult somehow. It turns out that Jeffrey Richter wrote this awesome article on the .NET asynchronous programming model (Implementing the CLR Asynchronous Programming Model), which included an implementation of the IAsyncResult interface which only created a ManualResetEvent when it was needed, i.e. the event was created lazily, which is great for performance, and reduces the number of system objects created.
Getting back to my example, the
IAsyncResult implementation had to be a base class which would handle the majority of the busy work needed to create an
IAsyncResult implementation, be able to return values from the EndMethod, and not corrupt stack traces so the original exception source could be found.
Without further adieu, here is the implementation, that you’re free to use wherever you need it
namespace DigitalMindspring.Threading
{
using System;
using System.Threading;
/// <summary>
/// </summary>
/// <remarks>
/// This is a modified version of the code found in Jeffery Richter's
/// article in the March 2007 issue of Concurrent Affairs titled
/// "Implementing the CLR Asynchronous Programming Model"
///
/// Direct Link : http://msdn.microsoft.com/en-us/magazine/cc163467.aspx
/// Search Link(s): http://www.google.com/search?hl=en&source=hp&q=Concurrent+Affairs+Implementing+the+CLR+Asynchronous+Programming+Model&aq=f&oq=&aqi=
/// : http://www.bing.com/search?q=Concurrent+Affairs+Implementing+the+CLR+Asynchronous+Programming+Model&go=&form=QBLH&qs=n
/// </remarks>
public abstract class AsyncResultBase : IAsyncResult
{
// Fields set at construction which do change after
// operation completes
private const Int32 StatePending = 0;
private const Int32 StateCompletedSynchronously = 1;
private const Int32 StateCompletedAsynchronously = 2;
private readonly AsyncCallback _asyncCallback;
private readonly Object _asyncState;
// Field that may or may not get set depending on usage
private ManualResetEvent _asyncWaitHandle;
private Int32 _completedState = StatePending;
// Fields set when operation completes
private Exception _exception;
protected AsyncResultBase(AsyncCallback asyncCallback, Object state)
{
_asyncCallback = asyncCallback;
_asyncState = state;
}
protected void SetAsCompleted(Exception exception, Boolean completedSynchronously)
{
// Passing null for exception means no error occurred.
// This is the common case.
if( exception != null )
{
if( exception is AsyncOperationException )
{
// Pass on the exception if the stack has already
// been saved
_exception = exception;
}
else
{
// Save the stack trace
_exception = new AsyncOperationException(exception);
}
}
// The _CompletedState field MUST be set prior calling the callback
Int32 newState = completedSynchronously ? StateCompletedSynchronously : StateCompletedAsynchronously;
Int32 prevState = Interlocked.Exchange(ref _completedState, newState);
// Call "Complete" code only if the prevState changes from StatePending
if (prevState == StatePending)
{
// If the event exists, set it
if (_asyncWaitHandle != null)
{
_asyncWaitHandle.Set();
}
// If a callback method was set, call it
if (_asyncCallback != null)
{
_asyncCallback(this);
}
return;
}
// Throw an exception if we're not resetting the completion status
if (prevState != newState)
{
throw new InvalidOperationException("You can set a result only once");
}
}
public Boolean TryWaitForComplete(int milliseconds)
{
// This method assumes that only 1 thread calls EndInvoke
// for this object
if (!IsCompleted)
{
// If the operation isn't done, try waiting for it
if (!AsyncWaitHandle.WaitOne(milliseconds, false))
{
return false;
}
}
// Operation is done: if an exception occured, throw it
if (_exception != null)
{
throw _exception;
}
return true;
}
#region Implementation of IAsyncResult
public Object AsyncState
{
get { return _asyncState; }
}
public Boolean CompletedSynchronously
{
get { return Thread.VolatileRead(ref _completedState) == StateCompletedSynchronously; }
}
public WaitHandle AsyncWaitHandle
{
get
{
if (_asyncWaitHandle == null)
{
Boolean done = IsCompleted;
ManualResetEvent mre = new ManualResetEvent(done);
if (Interlocked.CompareExchange(ref _asyncWaitHandle,
mre, null) != null)
{
// Another thread created this object's event; dispose
// the event we just created
mre.Close();
}
else
{
if (!done && IsCompleted)
{
// If the operation wasn't done when we created
// the event but now it is done, set the event
_asyncWaitHandle.Set();
}
}
}
return _asyncWaitHandle;
}
}
public Boolean IsCompleted
{
get
{
return Thread.VolatileRead(ref _completedState) !=
StatePending;
}
}
#endregion
}
public interface IAsyncResultEx : IAsyncResult
{
void WaitForComplete();
Boolean WaitForComplete(int milliseconds);
}
public interface IAsyncResultEx<TReturnValue> : IAsyncResult
{
TReturnValue WaitForComplete();
Boolean WaitForComplete(out TReturnValue returnValue, int milliseconds);
}
public class AsyncResultEx<TReturnValue> : AsyncResultBase, IAsyncResultEx<TReturnValue>
{
public TReturnValue ReturnValue;
public AsyncResultEx(AsyncCallback asyncCallback, Object state)
: base(asyncCallback, state)
{
}
#region IAsyncResultEx<TReturnValue> Members
public TReturnValue WaitForComplete()
{
TryWaitForComplete(-1);
return ReturnValue;
}
public Boolean WaitForComplete(out TReturnValue returnValue, int milliseconds)
{
if (TryWaitForComplete(milliseconds))
{
returnValue = ReturnValue;
return true;
}
returnValue = default(TReturnValue);
return false;
}
#endregion
public void Complete(TReturnValue returnValue)
{
ReturnValue = returnValue;
SetAsCompleted(null, false);
}
public void Complete(TReturnValue returnValue, Boolean completedSyunchronously)
{
ReturnValue = returnValue;
SetAsCompleted(null, completedSyunchronously);
}
public void Throw(Exception exception)
{
SetAsCompleted(exception, false);
}
}
public class AsyncResultEx : AsyncResultBase, IAsyncResultEx
{
public AsyncResultEx(AsyncCallback asyncCallback, Object state)
: base(asyncCallback, state)
{
}
#region IAsyncResultEx Members
public void WaitForComplete()
{
TryWaitForComplete(-1);
}
public Boolean WaitForComplete(int milliseconds)
{
return TryWaitForComplete(milliseconds);
}
#endregion
public void Complete()
{
SetAsCompleted(null, false);
}
public void Complete(Boolean completedSyunchronously)
{
SetAsCompleted(null, completedSyunchronously);
}
public void Throw(Exception exception)
{
SetAsCompleted(exception, false);
}
}
}
Figure 3: Generic implementation of IAsyncResult