|
2 | 2 | using System.Collections.Generic;
|
3 | 3 | using System.Diagnostics;
|
4 | 4 | using System.Linq;
|
| 5 | +using System.Runtime.CompilerServices; |
5 | 6 | using System.Threading;
|
6 | 7 | using System.Threading.Channels;
|
7 | 8 | using System.Threading.Tasks;
|
@@ -347,5 +348,79 @@ async ValueTask Dequeue()
|
347 | 348 | }
|
348 | 349 |
|
349 | 350 |
|
| 351 | + [Fact] |
| 352 | + public static async Task ReadBatchWithTimeoutEnumerableBakedIn() |
| 353 | + { |
| 354 | + var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false }); |
| 355 | + _ = Task.Run(async () => { |
| 356 | + //await Task.Delay(1000); |
| 357 | + c.Writer.TryWrite(1); |
| 358 | + c.Writer.TryWrite(2); |
| 359 | + |
| 360 | + c.Writer.TryWrite(3); |
| 361 | + await Task.Delay(600); |
| 362 | + |
| 363 | + c.Writer.TryWrite(4); |
| 364 | + c.Writer.TryWrite(5); |
| 365 | + Debug.WriteLine("Writing Complete."); |
| 366 | + c.Writer.Complete(); |
| 367 | + }); |
| 368 | + var i = 0; |
| 369 | + await foreach (var batch in c.Reader.ReadBatchEnumerableAsyncBakedIn(2, TimeSpan.FromMilliseconds(500), CancellationToken.None)) |
| 370 | + { |
| 371 | + switch (i) |
| 372 | + { |
| 373 | + case 0: |
| 374 | + Assert.Equal(1, batch[0]); |
| 375 | + Assert.Equal(2, batch[1]); |
| 376 | + Debug.WriteLine("First batch received: " + string.Join(',', batch.Select(item => item))); |
| 377 | + break; |
| 378 | + case 1: |
| 379 | + Assert.Equal(1, batch.Count); |
| 380 | + Assert.Equal(3, batch[0]); |
| 381 | + Debug.WriteLine("Second batch received: " + string.Join(',', batch.Select(item => item))); |
| 382 | + break; |
| 383 | + case 2: |
| 384 | + Assert.Equal(4, batch[0]); |
| 385 | + Assert.Equal(5, batch[1]); |
| 386 | + Debug.WriteLine("Third batch received: " + string.Join(',', batch.Select(item => item))); |
| 387 | + break; |
| 388 | + default: |
| 389 | + throw new Exception("Shouldn't arrive here. Got batch: " + string.Join(',', batch.Select(item => item))); |
| 390 | + } |
| 391 | + i++; |
| 392 | + } |
| 393 | + Assert.Equal(3, i); |
| 394 | + await c.Reader.Completion; // Propagate possible failure |
| 395 | + } |
| 396 | + public static async IAsyncEnumerable<IList<T>> ReadBatchEnumerableAsyncBakedIn<T>( |
| 397 | + this ChannelReader<T> channelReader, |
| 398 | + int batchSize, |
| 399 | + TimeSpan timeout, |
| 400 | + [EnumeratorCancellation] CancellationToken cancellationToken = default) |
| 401 | + { |
| 402 | + var reader = channelReader.Batch(batchSize); |
| 403 | + reader = reader.WithTimeout(timeout); // stack overflow here |
| 404 | + while (true) |
| 405 | + { |
| 406 | + List<T> item; |
| 407 | + try |
| 408 | + { |
| 409 | + item = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); |
| 410 | + } |
| 411 | + catch (OperationCanceledException) |
| 412 | + { |
| 413 | + cancellationToken.ThrowIfCancellationRequested(); |
| 414 | + yield break; |
| 415 | + } |
| 416 | + catch (ChannelClosedException) |
| 417 | + { |
| 418 | + yield break; |
| 419 | + } |
| 420 | + |
| 421 | + if (item?.Count > 0) yield return item; |
| 422 | + } |
| 423 | + } |
| 424 | + |
350 | 425 | }
|
351 | 426 |
|
0 commit comments