Skip to content

Commit 18d02e4

Browse files
afscromeasosMikeGore
authored andcommitted
Added CancellationToken support to Event Store (#45)
Added support for passing Cancellation Tokens down to the Cosmos SDK to allow for better control over timeouts. Due to the addition of optional parameters, this is a runtime breaking change, but not a compile time one.
1 parent c5eb62f commit 18d02e4

3 files changed

Lines changed: 23 additions & 10 deletions

File tree

src/SimpleEventStore/SimpleEventStore.AzureDocumentDb/AzureDocumentDbStorageEngine.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Microsoft.Azure.Documents;
67
using Microsoft.Azure.Documents.Client;
@@ -39,19 +40,28 @@ internal AzureDocumentDbStorageEngine(DocumentClient client, string databaseName
3940
this.databaseUri = UriFactory.CreateDatabaseUri(databaseName);
4041
}
4142

42-
public async Task<IStorageEngine> Initialise()
43+
public async Task<IStorageEngine> Initialise(CancellationToken cancellationToken = default)
4344
{
45+
cancellationToken.ThrowIfCancellationRequested();
4446
await CreateDatabaseIfItDoesNotExist();
47+
48+
cancellationToken.ThrowIfCancellationRequested();
4549
await CreateCollectionIfItDoesNotExist();
50+
51+
cancellationToken.ThrowIfCancellationRequested();
4652
await CreateAppendStoredProcedureIfItDoesNotExist();
4753

54+
55+
cancellationToken.ThrowIfCancellationRequested();
4856
await SetDatabaseOfferThroughput();
57+
58+
cancellationToken.ThrowIfCancellationRequested();
4959
await SetCollectionOfferThroughput();
5060

5161
return this;
5262
}
5363

54-
public async Task AppendToStream(string streamId, IEnumerable<StorageEvent> events)
64+
public async Task AppendToStream(string streamId, IEnumerable<StorageEvent> events, CancellationToken cancellationToken = default)
5565
{
5666
var docs = events.Select(d => DocumentDbStorageEvent.FromStorageEvent(d, this.typeMap, this.jsonSerializer)).ToList();
5767

@@ -60,6 +70,7 @@ public async Task AppendToStream(string streamId, IEnumerable<StorageEvent> even
6070
var result = await this.client.ExecuteStoredProcedureAsync<dynamic>(
6171
storedProcLink,
6272
new RequestOptions { PartitionKey = new PartitionKey(streamId), ConsistencyLevel = this.collectionOptions.ConsistencyLevel },
73+
cancellationToken,
6374
docs);
6475

6576
loggingOptions.OnSuccess(ResponseInformation.FromWriteResponse(nameof(AppendToStream), result));
@@ -75,7 +86,7 @@ public async Task AppendToStream(string streamId, IEnumerable<StorageEvent> even
7586
}
7687
}
7788

78-
public async Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead)
89+
public async Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead, CancellationToken cancellationToken = default)
7990
{
8091
int endPosition = numberOfEventsToRead == int.MaxValue ? int.MaxValue : startPosition + numberOfEventsToRead;
8192

@@ -88,7 +99,7 @@ public async Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string s
8899

89100
while (eventsQuery.HasMoreResults)
90101
{
91-
var response = await eventsQuery.ExecuteNextAsync<DocumentDbStorageEvent>();
102+
var response = await eventsQuery.ExecuteNextAsync<DocumentDbStorageEvent>(cancellationToken);
92103
loggingOptions.OnSuccess(ResponseInformation.FromReadResponse(nameof(ReadStreamForwards), response));
93104

94105
foreach (var e in response)
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Threading;
34
using System.Threading.Tasks;
45

56
namespace SimpleEventStore
67
{
78
public interface IStorageEngine
89
{
9-
Task AppendToStream(string streamId, IEnumerable<StorageEvent> events);
10+
Task AppendToStream(string streamId, IEnumerable<StorageEvent> events, CancellationToken cancellationToken = default);
1011

11-
Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead);
12+
Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead, CancellationToken cancellationToken = default);
1213

13-
Task<IStorageEngine> Initialise();
14+
Task<IStorageEngine> Initialise(CancellationToken cancellationToken = default);
1415
}
1516
}

src/SimpleEventStore/SimpleEventStore/InMemory/InMemoryStorageEngine.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Collections.Concurrent;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Threading;
45
using System.Threading.Tasks;
56

67
namespace SimpleEventStore.InMemory
@@ -12,7 +13,7 @@ public class InMemoryStorageEngine : IStorageEngine
1213
private readonly ConcurrentDictionary<string, List<StorageEvent>> streams = new ConcurrentDictionary<string, List<StorageEvent>>();
1314
private readonly List<StorageEvent> allEvents = new List<StorageEvent>();
1415

15-
public Task AppendToStream(string streamId, IEnumerable<StorageEvent> events)
16+
public Task AppendToStream(string streamId, IEnumerable<StorageEvent> events, CancellationToken cancellationToken = default)
1617
{
1718
return Task.Run(() =>
1819
{
@@ -41,7 +42,7 @@ private void AddEventsToAllStream(IEnumerable<StorageEvent> events)
4142
}
4243
}
4344

44-
public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead)
45+
public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead, CancellationToken cancellationToken = default)
4546
{
4647
if (!streams.ContainsKey(streamId))
4748
{
@@ -52,7 +53,7 @@ public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamI
5253
return Task.FromResult(stream);
5354
}
5455

55-
public Task<IStorageEngine> Initialise()
56+
public Task<IStorageEngine> Initialise(CancellationToken cancellationToken = default)
5657
{
5758
return Task.FromResult<IStorageEngine>(this);
5859
}

0 commit comments

Comments
 (0)