Skip to content

Commit f6ea5bb

Browse files
Optimizations and ensured live changing the timeout during operation.
1 parent 7965ef9 commit f6ea5bb

File tree

6 files changed

+89
-135
lines changed

6 files changed

+89
-135
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Channels;
7+
using System.Threading.Tasks;
8+
using Xunit;
9+
10+
namespace Open.ChannelExtensions.Tests;
11+
12+
public static class AssumptionTests
13+
{
14+
[Fact]
15+
public static async Task WaitCancellation()
16+
{
17+
var channel = Channel.CreateUnbounded<int>();
18+
using (var tokenSource = new CancellationTokenSource())
19+
{
20+
CancellationToken token = tokenSource.Token;
21+
22+
var t = channel.Reader.WaitToReadAsync(token).ConfigureAwait(false);
23+
tokenSource.Cancel();
24+
25+
// NOTE: a cancelled WaitToReadAsync will throw.
26+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await t);
27+
}
28+
29+
using (var tokenSource = new CancellationTokenSource())
30+
{
31+
CancellationToken token = tokenSource.Token;
32+
33+
var t1 = channel.Reader.WaitToReadAsync(token);
34+
var t2 = channel.Reader.WaitToReadAsync();
35+
tokenSource.Cancel();
36+
37+
// NOTE: a cancelled WhenAny will not throw!
38+
var result = await Task.WhenAny(t1.AsTask(), t2.AsTask()).ConfigureAwait(false);
39+
Assert.True(result.IsCanceled);
40+
}
41+
42+
}
43+
}

Open.ChannelExtensions.Tests/BatchTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ public static async Task BatchReadBehavior()
340340

341341
async ValueTask Dequeue()
342342
{
343-
Assert.True(c.Writer.TryWrite(e));
343+
if (!c.Writer.TryWrite(e)) return;
344344
while (queue.TryDequeue(out e) && c.Writer.TryWrite(e))
345345
await Task.Yield();
346346
}

Open.ChannelExtensions/BatchingChannelReader.cs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Diagnostics;
44
using System.Diagnostics.Contracts;
5+
using System.Runtime.CompilerServices;
56
using System.Threading;
67
using System.Threading.Channels;
78
using System.Threading.Tasks;
@@ -68,9 +69,27 @@ public BatchingChannelReader<T> WithTimeout(long millisecondsTimeout)
6869
LazyInitializer.EnsureInitialized(ref _timer,
6970
() => new Timer(obj => ForceBatch()));
7071

72+
if (_batch is null) return this;
73+
74+
// Might be in the middle of a batch so we need to update the timeout.
75+
lock(Buffer)
76+
{
77+
if (_batch is not null) RefreshTimeout();
78+
}
79+
7180
return this;
7281
}
7382

83+
/// <summary>
84+
/// If one exists, updates the timer's timeout value.
85+
/// </summary>
86+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
87+
protected void RefreshTimeout()
88+
{
89+
var ok = _timer?.Change(_timeout, 0);
90+
Debug.Assert(ok ?? true);
91+
}
92+
7493
/// <param name="timeout">
7594
/// The timeout value where after a batch is forced.<br/>
7695
/// A value of zero or less cancels/clears any timeout.<br/>
@@ -139,13 +158,9 @@ protected override bool TryPipeItems(bool flush)
139158
Emit(ref c);
140159

141160
finalizeTimer:
142-
161+
143162
// Are we adding to the existing batch (active timeout) or did we create a new one?
144-
if (newBatch && _batch is not null)
145-
{
146-
var ok = _timer?.Change(_timeout, 0);
147-
Debug.Assert(ok ?? true);
148-
}
163+
if (newBatch && _batch is not null) RefreshTimeout();
149164

150165
return batched;
151166

@@ -167,16 +182,16 @@ protected override async ValueTask<bool> WaitToReadAsyncCore(
167182
CancellationToken cancellationToken)
168183
{
169184
ChannelReader<T>? source = Source;
170-
if (source is null) return await bufferWait.ConfigureAwait(false);
171185

172-
Task<bool>? b = bufferWait.AsTask();
186+
if (source is null || bufferWait.IsCompleted)
187+
return await bufferWait.ConfigureAwait(false);
188+
189+
var b = bufferWait.AsTask();
173190
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
174191
CancellationToken token = tokenSource.Token;
175192

176193
start:
177194

178-
if (b.IsCompleted) return await b.ConfigureAwait(false);
179-
180195
ValueTask<bool> s = source.WaitToReadAsync(token);
181196
if (s.IsCompleted && !b.IsCompleted) TryPipeItems(false);
182197

@@ -186,14 +201,19 @@ protected override async ValueTask<bool> WaitToReadAsyncCore(
186201
return await b.ConfigureAwait(false);
187202
}
188203

204+
// WhenAny will not throw when a task is cancelled.
189205
await Task.WhenAny(s.AsTask(), b).ConfigureAwait(false);
190-
if (b.IsCompleted)
206+
if (b.IsCompleted) // Assuming it was bufferWait that completed.
191207
{
192208
tokenSource.Cancel();
193209
return await b.ConfigureAwait(false);
194210
}
195211

196212
TryPipeItems(false);
213+
214+
if (b.IsCompleted || token.IsCancellationRequested)
215+
return await b.ConfigureAwait(false);
216+
197217
goto start;
198218
}
199219
}

Open.ChannelExtensions/BufferingChannelReader.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ protected BufferingChannelReader(ChannelReader<TIn> source, bool singleReader, b
4848
// Need to be sure writing is done before we continue...
4949
lock (Buffer)
5050
{
51+
/* When the source is complete,
52+
* we dump all remaining into the buffer
53+
* in order to propagate the completion and any exception. */
5154
TryPipeItems(true);
5255
Buffer.Writer.Complete(t.Exception);
5356
}
@@ -114,15 +117,14 @@ public override ValueTask<bool> WaitToReadAsync(CancellationToken cancellationTo
114117
protected virtual async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> bufferWait, CancellationToken cancellationToken)
115118
{
116119
ChannelReader<TIn>? source = Source;
117-
if (source is null) return await bufferWait.ConfigureAwait(false);
120+
if (source is null || bufferWait.IsCompleted)
121+
return await bufferWait.ConfigureAwait(false);
118122

119123
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
120124
CancellationToken token = tokenSource.Token;
121125

122126
start:
123127

124-
if (bufferWait.IsCompleted) return await bufferWait.ConfigureAwait(false);
125-
126128
ValueTask<bool> s = source.WaitToReadAsync(token);
127129
if (s.IsCompleted && !bufferWait.IsCompleted) TryPipeItems(false);
128130

@@ -131,13 +133,11 @@ protected virtual async ValueTask<bool> WaitToReadAsyncCore(ValueTask<bool> buff
131133
tokenSource.Cancel();
132134
return await bufferWait.ConfigureAwait(false);
133135
}
136+
134137
await s.ConfigureAwait(false);
135-
if (bufferWait.IsCompleted)
136-
{
137-
tokenSource.Cancel();
138-
return await bufferWait.ConfigureAwait(false);
139-
}
138+
if (bufferWait.IsCompleted) return await bufferWait.ConfigureAwait(false);
140139
TryPipeItems(false);
140+
if (bufferWait.IsCompleted) return await bufferWait.ConfigureAwait(false);
141141

142142
goto start;
143143
}

Open.ChannelExtensions/Documentation.xml

Lines changed: 5 additions & 114 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Open.ChannelExtensions/Open.ChannelExtensions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<RepositoryUrl>https://github.com/Open-NET-Libraries/Open.ChannelExtensions</RepositoryUrl>
2121
<RepositoryType>git</RepositoryType>
2222
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
23-
<Version>5.1.1</Version>
23+
<Version>5.1.2</Version>
2424
<PackageReleaseNotes></PackageReleaseNotes>
2525
<PackageLicenseExpression>MIT</PackageLicenseExpression>
2626
<PublishRepositoryUrl>true</PublishRepositoryUrl>

0 commit comments

Comments
 (0)