One of the many great features of C# 8 is async streams. Before C# 8, you could use the await keyword only to get a single result – when the asynchronous method returns the result. This changes with C# 8. Using await it’s now possible to get a stream of results. This was made possible by defining asynchronous iterator interfaces, and updates to the foreach loop and the yield statement. This article gives you more information on this.
Sample application
With the sample application a device is simulated that returns two random values after a random time. The data returned from the device is implemented in the class SensorData
. This is a simple type with two read-only properties:
public struct SensorData { public SensorData(int value1, int value2) => (Value1, Value2) = (value1, value2); public int Value1 { get; } public int Value2 { get; } }
Using yield to implement IAsyncEnumerable
The ADevice
class implements the interface IAsyncEnumerable
using the yield statement. yield was changed to not only implement synchronous iterators with the IEnumerable
and IEnumerator
interfaces, but also the IAsyncEnumerable
and IAsyncEnumerator
interfaces. In the method GetSensorData1, the yield statement is used to return sensor data with random values after a random time in an endless loop:
public class ADevice { public async IAsyncEnumerable<SensorData> GetSensorData1() { var r = new Random(); while (true) { await Task.Delay(r.Next(300)); yield return new SensorData(r.Next(100), r.Next(100)); } } }
Async Enumerator Interfaces
For async streams, asynchronous versions of the enumerator interfaces have been created. The interfaces IEnumerable
and IEnumerator
need an asynchronous counterpart. The interface IAsyncEnumerable
defines the method GetAsyncEnumerator
which returns an IAsyncEnumerator
. This is similar to the implementation of the IEnumerable
interface. Because of the asynchronous nature of the interface, the GetAsyncEnumerator
method defines an optional parameter of the CancellationToken
type to allow for early cancellation.
public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default(CancellationToken)); }
The C# 8 proposal for async streams on GitHub mentions to define
IAsyncEnumerable
to be cancellation agnostic. This changed. The implementation in System.Runtime, version 4.2.1.0, the methodGetAsyncEnumerator
accepts an optionalCancellationToken
.
The interface IAsyncEnumerator
defines the Current
property and the MoveNextAsync
method. There’s a small difference to the IEnumerator
interface. This interface also defines the Reset
method. The Reset
method has its history with COM interoperability.
public interface IAsyncEnumerator<out T> : IAsyncDisposable { T Current { get; } ValueTask<bool> MoveNextAsync(); }
Alternative interface definitions such as a
MoveNextAsync
method returning the value instead of theCurrent
property was considered. However, this would be harder to use, and can’t be covariant.
An asynchronous version of the IDisposable
interface is needed as well. IAsyncEnumerator
derives from the interface IAsyncDisposable
which allows for async disposal of resources with the DisposeAsync
method:
public interface IAsyncDisposable { ValueTask DisposeAsync(); }
It was considered to not derive
IAsyncEnumerator
fromIAsyncDisposable
. However, this design would complicate other parts, e.g. with pattern-based helpers that need to deal with different scenarios if
ValueTask instead of Task
The interfaces IAsyncDisposable
and IAsyncEnumerator
return ValueTask
with the methods MoveNextAsync
and DisposeAsync
. C# 7 was changed to allow awaits not only on the Task
type, but instead with any type implementing the GetAwaiter
method. ValueTask
is one of the types that can be used here. ValueTask
is implemented as a value type instead of a reference type with the Task
. With this, ValueTask
doesn’t have the overhead of an object in the managed heap. This can be extremly useful iterating through a list where not every iteration really requires an asynchronous operation.
Using foreach
Next, let’s iterate the asynchronous stream. The foreach statement has been extended with asynchronous functionality – making use of the asynchronous interfaces when the await keyword is used. With await foreach, one item is retrieved after the other – without blocking the calling thread.
private static async Task Demo1Async() { var aDevice = new ADevice(); await foreach (var x in aDevice.GetSensorData1()) { Console.WriteLine($"{x.Value1} {x.Value2}"); } }
Behind the scenes, the compiler translates the async foreach statement to a While
loop, invocation of the MoveNextAsync
method, and accessing the Current
property:
private static async Task Demo2Async() { var aDevice = new ADevice(); IAsyncEnumerable<SensorData> en = aDevice.GetSensorData1(); IAsyncEnumerator<SensorData> enumerator = en.GetAsyncEnumerator(); while (await enumerator.MoveNextAsync()) { var sensorData = enumerator.Current; Console.WriteLine($"{sensorData.Value1} {sensorData.Value2}"); } await enumerator.DisposeAsync(); }
Cancellation
Creating a custom implementation of the IAsyncEnumerator
interface, it would be possible to directly access the CancellationToken
defined with the GetAsyncEnumerator
method. This is not (yet?) possible when using yield return
to create the implementation. However, you can define a parameter of type CancellationToken
, register with this token, and get out (throw the OperationCanceledException
) when the operation should be canceled:
public async IAsyncEnumerable GetSensorData2(CancellationToken cancellationToken = default) { bool cancel = false; using var registration = cancellationToken.Register(() => cancel = true); var r = new Random(); while (!cancel) { await Task.Delay(r.Next(500)); yield return new SensorData(r.Next(100), r.Next(100)); } Console.WriteLine("cancel requested"); throw new OperationCanceledException(cancellationToken); } ´´´ Invoking the method `GetSensorData2`, the `CancellationToken` can be passed. The following sample code sends a cancellation after 5 seconds: ```csharp private static async Task Demo3Async() { try { var cts = new CancellationTokenSource(); cts.CancelAfter(5000); var aDevice = new ADevice(); await foreach (var x in aDevice.GetSensorData2(cts.Token)) { Console.WriteLine($"{x.Value1} {x.Value2}"); } } catch (OperationCanceledException ex) { Console.WriteLine(ex.Message); } }
Using implementations of
IAsyncEnumerable
which make use of optional the passedCancellationToken
, the extension methodWithCancellation
can be used to pass this token. Similarly, the extension methodConfigureAwait
can be used if async streams should not continue on the captured context, if a different thread might be used with the continuation.
Comparing to IObservable
Since .NET Framework 4.0, .NET included the IObserver
and IObservable
interfaces. Reactive Extensions makes use of these interfaces. How does this compare to the new interfaces defined for async streams?
With the IObservable
interface, the Subscribe
method can be used to assign a subscriber and receive events. This explains the major difference between this model and the new async streams. Observables uses a push based model, the sender is in control. The subscriber receives events when new items are available. With async streams, a pull based model is used. The caller is in control when doing the next invocation of the GetNextAsync
method, and waits here to receive the result. Because of the async implementation, the caller continues only when the result is received, but the calling thread is not blocked and can continue other work.
Summary
async streams fits well to the async programming model of C# available since C# 5. Instead of receiving just one result after the asynchronous operation is completed, a stream of results is returned – and you can use the well known C# statements foreach, and yield return together with the async and await keywords. The synchronous interfaces for enumerations have corresponding asynchronous interfaces that are implemented using yield, and used with async foreach.
If you’ve read this far, consider buying me a coffee which helps me staying up longer and writing more articles.
Interesting Links related with this article:
C# 8 Proposal for Async Streams
Proposal: IAsyncEnumerable.WithCancellation Extension Method
C# 8: Pattern matching extended
C# 8: No more NullReferenceExceptions – What about legacy code?
More information on C# is in my book Professional C# 7 and .NET Core 2.0, and in my workshops.
Enjoy learning and programming!
Christian
Nice intro!
For cancellation, here’s the recommended approach to producing async-streams with cancellation support: http://blog.monstuff.com/archives/2019/03/async-enumerables-with-cancellation.html
nit: the alternative interface that was considered has `TryGetNext` instead of `MoveNextAsync` and was designed with variance in mind (ie. it also had `out T`).
LikeLike
Hi dumky2, thanks for the information! I had the information with `MoveNextAsync` from the C# GitHub repos. You’re links include the Roslyn repos where’s more information from the implementation – thanks!
Christian
LikeLike
Reblogged this on WildGenie's.
LikeLike