ASP.NET Core SignalR version 2.1 offers streaming from the server to the client. Using ASP.NET Core 3.0, streams can also be sent from the client to the server. C# 8 gives a new syntax for asynchronous streaming. The new syntax is available with the new SignalR version.
This article compares streaming using SignalR with the old and new C# 8 syntax.
See the article Async Streams with C# 8 for more information on the C# 8 syntax.
Stream to the client returning a ChannelReader
Let’s start returning a stream of SomeData objects from the server to the client. The method GetSomeDataWithChannelReader
can be invoked with the number of items returned, and the delay that should be used between sending each item along with an option to cancel the stream from the client. The Channel
class used here (from the namespace System.Threading.Channels
) allows readers and writers for a stream of data. The method CreateBounded
returns a channel with a maximum capacity, the method CreateUnbounded
that is used here offers an unlimited capacity. The reader of this channel is returned to the client, whereas the writer is passed to the method WriteItemsAsync
. The WriteItemsAsync
method uses the ChannelWriter
to send data to the client:
public class ServerToClientStreamingHub : Hub { public ChannelReader<SomeData> GetSomeDataWithChannelReader( int count, int delay, CancellationToken cancellationToken) { var channel = Channel.CreateUnbounded<SomeData>(); _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken); return channel.Reader; } private async Task WriteItemsAsync( ChannelWriter<SomeData> writer, int count, int delay, CancellationToken cancellationToken) { try { for (var i = 0; i < count; i++) { cancellationToken.ThrowIfCancellationRequested(); await writer.WriteAsync(new SomeData() { Value = i }); await Task.Delay(delay, cancellationToken); } } catch (Exception ex) { writer.TryComplete(ex); } writer.TryComplete(); } }
SomeData
is defined using a simple class:
public class SomeData { public int Value { get; set; } public override string ToString() => Value.ToString(); }
Client Application using the ChannelReader
With the client application, a connection to the server is created using the HubConnectionBuilder
.
s_hubConnection = new HubConnectionBuilder() .WithUrl("https://localhost:5001/hubs/stream") .Build(); s_hubConnection.Closed += async (ex) => { Console.WriteLine(ex.Message); Console.WriteLine("restart"); await Task.Delay(new Random().Next(0, 5) * 1000); await s_hubConnection.StartAsync(); }; var cts = new CancellationTokenSource(); await s_hubConnection.StartAsync(cts.Token);
The client application can start reading the stream continuously using the StreamAsChannelAsync
method of the connection. The first argument passed to this method defines the name of the method on the server. The other arguments are the parameters of the called method – the number of invocations, the delay, and the cancellation token.
// read from the hub using ChannelReader var channel = await s_hubConnection.StreamAsChannelAsync<SomeData>("GetSomeDataWithChannelReader", 100, 1000, cts.Token); while (await channel.WaitToReadAsync()) { while (channel.TryRead(out SomeData data)) { Console.WriteLine($"received {data}"); } }
Running the application, the data is streamed from the server to the client.
Client Application using Async Streams
The client application can now be changed using C# 8 async streams – instead of using the ChannelReader
. All what needs to be changed is calling the method StreamAsync
. This method returns IAsyncEnumerable
. With this interface, the await foreach statement can be used to iterate this stream:
// read from the hub using async streams var stream = s_hubConnection.StreamAsync<SomeData>("GetSomeDataWithAsyncStreams", 20, 100, cts.Token); await foreach (var d in stream) { Console.WriteLine($"received {d}"); }
Server Application using Async Streams
Not only the code on the client, but also the code on the server can be simplified. Instead of returning a ChannelReader
, and using a ChannelWriter
to send data in the stream, the server method can be defined to return IAsyncEnumerable
. With this, the yield return statement can be used that was changed to return this new asynchronous interface.
// v2 - async streams public async IAsyncEnumerable<SomeData> GetSomeDataWithAsyncStreams( int count, int delay, [EnumeratorCancellation] CancellationToken cancellationToken) { for (int i = 0; i < count; i++) { cancellationToken.ThrowIfCancellationRequested(); await Task.Delay(delay); yield return new SomeData { Value = i }; } }
Receiving Streams on the Server
To receive a stream from the client in the server, a parameter can be defined of type IAsyncEnumerable
. With this, async foreach can be used to access this stream:
public async Task StartStream2(string streamName, IAsyncEnumerable<SomeData> stream) { Console.WriteLine($"Receive stream {streamName}"); await foreach (var item in stream) { _logger.LogTrace($"received {item}"); } }
Sending Streams from the Client
With the client, it’s now the other way around. Invoking the SendAsync
method, a method returning IAsyncEnumerable
can be invoked. In the sample code, the local function named clientStreamData that is defined using the static
modifier is invoked to pass the stream to the server:
private static async Task ClientToServerStreamingAsync() { static async IAsyncEnumerable<SomeData> clientStreamData() { for (var i = 0; i < 20; i++) { await Task.Delay(2000); var data = new SomeData() { Value = i }; yield return data; } } s_hubConnection = new HubConnectionBuilder() .WithUrl("https://localhost:5001/hubs/uploadstream") .Build(); var cts = new CancellationTokenSource(); await s_hubConnection.StartAsync(cts.Token); await s_hubConnection.SendAsync("StartStream2", "Sample Stream", clientStreamData()); Console.WriteLine("SendAsync completed"); Console.ReadLine(); }
Take away
Async streams with C# 8 – using async foreach, as well as the enhance yield can simplify code. One example is the enhancement with streaming across SignalR. The .NET Core 3 version of SignalR allows client-to-server as well as server-to-client streaming. Using the new interfaces for async streams, e.g. IAsyncEnumerable simplifies code compared to using the channel readers and writers.
What do you think?
If you’ve read this far, consider buying me a coffee which helps me staying up longer and writing more articles.
You can get the complete console app and UWP app samples.
Enjoy learning and programming!
Christian
Links
More information on C# and programming .NET Core applications is in my book Professional C# 7 and .NET Core 2.0, and in my workshops.
Streaming digital data ID 61216525 © Alunablue | Dreamstime.com
is it possible to do both
i.e server hub function
public async IAsyncEnumerable ImportDataList(IAsyncEnumerable Data, CancellationToken cancellationToken)
{
//logic
}
LikeLike
Yes, this is possible. See https://github.com/ProfessionalCSharp/MoreSamples/tree/signalrstream/ASPNETCore/SignalRStreamDual
LikeLike