Async Streaming with ASP.NET Core SignalR and C# 8

ASP.NET Core SignalR version 2.1 offers streaming from the server to the client. Using ASP.NET Core 3.0, streams can also be sent from the client to the server. C# 8 gives a new syntax for asynchronous streaming. The new syntax is available with the new SignalR version.

This article compares streaming using SignalR with the old and new C# 8 syntax.

See the article Async Streams with C# 8 for more information on the C# 8 syntax.

Streaming data

Stream to the client returning a ChannelReader

Let’s start returning a stream of SomeData objects from the server to the client. The method GetSomeDataWithChannelReader can be invoked with the number of items returned, and the delay that should be used between sending each item along with an option to cancel the stream from the client. The Channel class used here (from the namespace System.Threading.Channels) allows readers and writers for a stream of data. The method CreateBounded returns a channel with a maximum capacity, the method CreateUnbounded that is used here offers an unlimited capacity. The reader of this channel is returned to the client, whereas the writer is passed to the method WriteItemsAsync. The WriteItemsAsync method uses the ChannelWriter to send data to the client:

public class ServerToClientStreamingHub : Hub
{
  public ChannelReader<SomeData> GetSomeDataWithChannelReader(
    int count,
    int delay,
    CancellationToken cancellationToken)
  {
    var channel = Channel.CreateUnbounded<SomeData>();
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);
    return channel.Reader;
  }

  private async Task WriteItemsAsync(
    ChannelWriter<SomeData> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
  {
    try
    {
      for (var i = 0; i < count; i++)
      {
        cancellationToken.ThrowIfCancellationRequested();
        await writer.WriteAsync(new SomeData() { Value = i });
        await Task.Delay(delay, cancellationToken);
      }
    }
    catch (Exception ex)
    {
      writer.TryComplete(ex);
    }

    writer.TryComplete();
  }
}

SomeData is defined using a simple class:

public class SomeData
{
  public int Value { get; set; }

  public override string ToString() => Value.ToString();
}

Client Application using the ChannelReader

With the client application, a connection to the server is created using the HubConnectionBuilder.

s_hubConnection = new HubConnectionBuilder()
  .WithUrl("https://localhost:5001/hubs/stream")
  .Build();

s_hubConnection.Closed += async (ex) =>
{
  Console.WriteLine(ex.Message);
  Console.WriteLine("restart");
  await Task.Delay(new Random().Next(0, 5) * 1000);
  await s_hubConnection.StartAsync();
};

var cts = new CancellationTokenSource();
await s_hubConnection.StartAsync(cts.Token);

The client application can start reading the stream continuously using the StreamAsChannelAsync method of the connection. The first argument passed to this method defines the name of the method on the server. The other arguments are the parameters of the called method – the number of invocations, the delay, and the cancellation token.

// read from the hub using ChannelReader
var channel = await s_hubConnection.StreamAsChannelAsync<SomeData>("GetSomeDataWithChannelReader", 100, 1000, cts.Token);
while (await channel.WaitToReadAsync())
{
  while (channel.TryRead(out SomeData data))
  {
    Console.WriteLine($"received {data}");
  }
}

Running the application, the data is streamed from the server to the client.

Client Application using Async Streams

The client application can now be changed using C# 8 async streams – instead of using the ChannelReader. All what needs to be changed is calling the method StreamAsync. This method returns IAsyncEnumerable. With this interface, the await foreach statement can be used to iterate this stream:

// read from the hub using async streams
var stream = s_hubConnection.StreamAsync<SomeData>("GetSomeDataWithAsyncStreams", 20, 100, cts.Token);
await foreach (var d in stream)
{
  Console.WriteLine($"received {d}");
}

Server Application using Async Streams

Not only the code on the client, but also the code on the server can be simplified. Instead of returning a ChannelReader, and using a ChannelWriter to send data in the stream, the server method can be defined to return IAsyncEnumerable. With this, the yield return statement can be used that was changed to return this new asynchronous interface.

// v2 - async streams
public async IAsyncEnumerable<SomeData> GetSomeDataWithAsyncStreams(
  int count,
  int delay,
  [EnumeratorCancellation] CancellationToken cancellationToken)
{
  for (int i = 0; i < count; i++)
  {
    cancellationToken.ThrowIfCancellationRequested();
    await Task.Delay(delay);
    yield return new SomeData { Value = i };
  }
}

Receiving Streams on the Server

To receive a stream from the client in the server, a parameter can be defined of type IAsyncEnumerable. With this, async foreach can be used to access this stream:

public async Task StartStream2(string streamName, IAsyncEnumerable<SomeData> stream)
{
  Console.WriteLine($"Receive stream {streamName}");
  await foreach (var item in stream)
  {
    _logger.LogTrace($"received {item}");
  }
}

Sending Streams from the Client

With the client, it’s now the other way around. Invoking the SendAsync method, a method returning IAsyncEnumerable can be invoked. In the sample code, the local function named clientStreamData that is defined using the static modifier is invoked to pass the stream to the server:

private static async Task ClientToServerStreamingAsync()
{
  static async IAsyncEnumerable<SomeData> clientStreamData()
  {
    for (var i = 0; i < 20; i++)
    {
      await Task.Delay(2000);
      var data = new SomeData() { Value = i };
      yield return data;
    }
  }

  s_hubConnection = new HubConnectionBuilder()
    .WithUrl("https://localhost:5001/hubs/uploadstream")
    .Build();

  var cts = new CancellationTokenSource();
  await s_hubConnection.StartAsync(cts.Token);

  await s_hubConnection.SendAsync("StartStream2", "Sample Stream", clientStreamData());

  Console.WriteLine("SendAsync completed");
  Console.ReadLine();
}

Take away

Async streams with C# 8 – using async foreach, as well as the enhance yield can simplify code. One example is the enhancement with streaming across SignalR. The .NET Core 3 version of SignalR allows client-to-server as well as server-to-client streaming. Using the new interfaces for async streams, e.g. IAsyncEnumerable simplifies code compared to using the channel readers and writers.

What do you think?

If you’ve read this far, consider buying me a coffee which helps me staying up longer and writing more articles.

Buy Me A Coffee

You can get the complete console app and UWP app samples.

Enjoy learning and programming!

Christian

Links

Async Streams with C# 8

More information on C# and programming .NET Core applications is in my book Professional C# 7 and .NET Core 2.0, and in my workshops.

Streaming digital data ID 61216525 © Alunablue | Dreamstime.com

5 thoughts on “Async Streaming with ASP.NET Core SignalR and C# 8

  1. is it possible to do both
    i.e server hub function
    public async IAsyncEnumerable ImportDataList(IAsyncEnumerable Data, CancellationToken cancellationToken)
    {
    //logic
    }

    Like

  2. This is all crap: not one single, complete working example: why do you not publish on Git Hub or make a Zip file available for download ?

    Like

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.