Async Streams with C# 8

One of the many great features of C# 8 is async streams. Before C# 8, you could use the await keyword only to get a single result – when the asynchronous method returns the result. This changes with C# 8. Using await it’s now possible to get a stream of results. This was made possible by defining asynchronous iterator interfaces, and updates to the foreach loop and the yield statement. This article gives you more information on this.

Water stream

Sample application

With the sample application a device is simulated that returns two random values after a random time. The data returned from the device is implemented in the class SensorData. This is a simple type with two read-only properties:

public struct SensorData
{
    public SensorData(int value1, int value2) => (Value1, Value2) = (value1, value2);

    public int Value1 { get; }
    public int Value2 { get; }
}

Using yield to implement IAsyncEnumerable

The ADevice class implements the interface IAsyncEnumerable using the yield statement. yield was changed to not only implement synchronous iterators with the IEnumerable and IEnumerator interfaces, but also the IAsyncEnumerable and IAsyncEnumerator interfaces. In the method GetSensorData1, the yield statement is used to return sensor data with random values after a random time in an endless loop:

public class ADevice
{
    public async IAsyncEnumerable<SensorData> GetSensorData1()
    {
        var r = new Random();
        while (true)
        {
            await Task.Delay(r.Next(300));
            yield return new SensorData(r.Next(100), r.Next(100));
        }
    }
}

Async Enumerator Interfaces

For async streams, asynchronous versions of the enumerator interfaces have been created. The interfaces IEnumerable and IEnumerator need an asynchronous counterpart. The interface IAsyncEnumerable defines the method GetAsyncEnumerator which returns an IAsyncEnumerator. This is similar to the implementation of the IEnumerable interface. Because of the asynchronous nature of the interface, the GetAsyncEnumerator method defines an optional parameter of the CancellationToken type to allow for early cancellation.

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default(CancellationToken));
}

The C# 8 proposal for async streams on GitHub mentions to define IAsyncEnumerable to be cancellation agnostic. This changed. The implementation in System.Runtime, version 4.2.1.0, the method GetAsyncEnumerator accepts an optional CancellationToken.

The interface IAsyncEnumerator defines the Current property and the MoveNextAsync method. There’s a small difference to the IEnumerator interface. This interface also defines the Reset method. The Reset method has its history with COM interoperability.

public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current
    {
        get;
    }

    ValueTask<bool> MoveNextAsync();
}

Alternative interface definitions such as a MoveNextAsync method returning the value instead of the Current property was considered. However, this would be harder to use, and can’t be covariant.

An asynchronous version of the IDisposable interface is needed as well. IAsyncEnumerator derives from the interface IAsyncDisposable which allows for async disposal of resources with the DisposeAsync method:

public interface IAsyncDisposable
{
    ValueTask DisposeAsync();
}

It was considered to not derive IAsyncEnumerator from IAsyncDisposable. However, this design would complicate other parts, e.g. with pattern-based helpers that need to deal with different scenarios if

ValueTask instead of Task

The interfaces IAsyncDisposable and IAsyncEnumerator return ValueTask with the methods MoveNextAsync and DisposeAsync. C# 7 was changed to allow awaits not only on the Task type, but instead with any type implementing the GetAwaiter method. ValueTask is one of the types that can be used here. ValueTask is implemented as a value type instead of a reference type with the Task. With this, ValueTask doesn’t have the overhead of an object in the managed heap. This can be extremly useful iterating through a list where not every iteration really requires an asynchronous operation.

Using foreach

Next, let’s iterate the asynchronous stream. The foreach statement has been extended with asynchronous functionality – making use of the asynchronous interfaces when the await keyword is used. With await foreach, one item is retrieved after the other – without blocking the calling thread.

private static async Task Demo1Async()
{
    var aDevice = new ADevice();

    await foreach (var x in aDevice.GetSensorData1())
    {
        Console.WriteLine($"{x.Value1} {x.Value2}");
    }
}

Behind the scenes, the compiler translates the async foreach statement to a While loop, invocation of the MoveNextAsync method, and accessing the Current property:

private static async Task Demo2Async()
{
    var aDevice = new ADevice();

    IAsyncEnumerable<SensorData> en = aDevice.GetSensorData1();
    IAsyncEnumerator<SensorData> enumerator = en.GetAsyncEnumerator();
    while (await enumerator.MoveNextAsync())
    {
        var sensorData = enumerator.Current;
        Console.WriteLine($"{sensorData.Value1} {sensorData.Value2}");
    }
    await enumerator.DisposeAsync();
}

Cancellation

Creating a custom implementation of the IAsyncEnumerator interface, it would be possible to directly access the CancellationToken defined with the GetAsyncEnumerator method. This is not (yet?) possible when using yield return to create the implementation. However, you can define a parameter of type CancellationToken, register with this token, and get out (throw the OperationCanceledException) when the operation should be canceled:

public async IAsyncEnumerable GetSensorData2(CancellationToken cancellationToken = default)
{
    bool cancel = false;
    using var registration = cancellationToken.Register(() => cancel = true);
    var r = new Random();
    while (!cancel)
    {
        await Task.Delay(r.Next(500));
        yield return new SensorData(r.Next(100), r.Next(100));
    }
    Console.WriteLine("cancel requested");
    throw new OperationCanceledException(cancellationToken);
}
´´´

Invoking the method `GetSensorData2`, the `CancellationToken` can be passed. The following sample code sends a cancellation after 5 seconds:

```csharp
private static async Task Demo3Async()
{
    try
    {
        var cts = new CancellationTokenSource();
        cts.CancelAfter(5000);
        var aDevice = new ADevice();

        await foreach (var x in aDevice.GetSensorData2(cts.Token))
        {
            Console.WriteLine($"{x.Value1} {x.Value2}");
        }
    }
    catch (OperationCanceledException ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Using implementations of IAsyncEnumerable which make use of optional the passed CancellationToken, the extension method WithCancellation can be used to pass this token. Similarly, the extension method ConfigureAwait can be used if async streams should not continue on the captured context, if a different thread might be used with the continuation.

Comparing to IObservable

Since .NET Framework 4.0, .NET included the IObserver and IObservable interfaces. Reactive Extensions makes use of these interfaces. How does this compare to the new interfaces defined for async streams?
With the IObservable interface, the Subscribe method can be used to assign a subscriber and receive events. This explains the major difference between this model and the new async streams. Observables uses a push based model, the sender is in control. The subscriber receives events when new items are available. With async streams, a pull based model is used. The caller is in control when doing the next invocation of the GetNextAsync method, and waits here to receive the result. Because of the async implementation, the caller continues only when the result is received, but the calling thread is not blocked and can continue other work.

Summary

async streams fits well to the async programming model of C# available since C# 5. Instead of receiving just one result after the asynchronous operation is completed, a stream of results is returned – and you can use the well known C# statements foreach, and yield return together with the async and await keywords. The synchronous interfaces for enumerations have corresponding asynchronous interfaces that are implemented using yield, and used with async foreach.

If you’ve read this far, consider buying me a coffee which helps me staying up longer and writing more articles.

Buy Me A Coffee

Interesting Links related with this article:

Complete code samples

C# 8 Proposal for Async Streams

Proposal: IAsyncEnumerable.WithCancellation Extension Method

C# 8: Indexes and Ranges

C# 8: Pattern matching extended

C# 8: No more NullReferenceExceptions – What about legacy code?

More information on C# is in my book Professional C# 7 and .NET Core 2.0, and in my workshops.

Enjoy learning and programming!

Christian

8 thoughts on “Async Streams with C# 8

    1. Hi dumky2, thanks for the information! I had the information with `MoveNextAsync` from the C# GitHub repos. You’re links include the Roslyn repos where’s more information from the implementation – thanks!
      Christian

      Like

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.