Skip to content

Commit bba69ce

Browse files
authored
Merge pull request #714 from Particular/send-only-config
Support send-only endpoint configuration
2 parents c0b40fe + fc77d19 commit bba69ce

14 files changed

Lines changed: 466 additions & 236 deletions

src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FunctionExecutionContext.cs

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/NServiceBus.AzureFunctions.InProcess.ServiceBus/InProcessFunctionEndpoint.cs

Lines changed: 52 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,11 @@
33
using System;
44
using System.Threading;
55
using System.Threading.Tasks;
6-
using System.Transactions;
76
using Azure.Messaging.ServiceBus;
87
using AzureFunctions.InProcess.ServiceBus;
98
using AzureFunctions.InProcess.ServiceBus.Serverless;
10-
using Extensibility;
119
using Microsoft.Azure.WebJobs.ServiceBus;
1210
using Microsoft.Extensions.Logging;
13-
using Transport;
14-
using Transport.AzureServiceBus;
1511
using ExecutionContext = Microsoft.Azure.WebJobs.ExecutionContext;
1612

1713
class InProcessFunctionEndpoint : IFunctionEndpoint
@@ -22,14 +18,52 @@ public InProcessFunctionEndpoint(
2218
IServiceProvider serviceProvider)
2319
{
2420
this.serverlessInterceptor = serverlessInterceptor;
25-
endpointFactory = _ => externallyManagedContainerEndpoint.Start(serviceProvider);
21+
endpointFactory = () => externallyManagedContainerEndpoint.Start(serviceProvider);
22+
}
23+
24+
public async Task ProcessAtomic(
25+
ServiceBusReceivedMessage message,
26+
ExecutionContext executionContext,
27+
ServiceBusClient serviceBusClient,
28+
ServiceBusMessageActions messageActions,
29+
ILogger functionsLogger = null,
30+
CancellationToken cancellationToken = default)
31+
{
32+
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
33+
34+
try
35+
{
36+
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
37+
}
38+
catch (Exception)
39+
{
40+
await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
41+
throw;
42+
}
43+
44+
await messageProcessor.ProcessAtomic(message, serviceBusClient, messageActions, cancellationToken)
45+
.ConfigureAwait(false);
46+
}
47+
48+
public async Task ProcessNonAtomic(
49+
ServiceBusReceivedMessage message,
50+
ExecutionContext executionContext,
51+
ILogger functionsLogger = null,
52+
CancellationToken cancellationToken = default)
53+
{
54+
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
55+
56+
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
57+
58+
await messageProcessor.ProcessNonAtomic(message, cancellationToken)
59+
.ConfigureAwait(false);
2660
}
2761

2862
public async Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null, CancellationToken cancellationToken = default)
2963
{
3064
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
3165

32-
await InitializeEndpointIfNecessary(executionContext, functionsLogger, cancellationToken).ConfigureAwait(false);
66+
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
3367
await endpoint.Send(message, options, cancellationToken).ConfigureAwait(false);
3468
}
3569

@@ -42,7 +76,7 @@ public async Task Send<T>(Action<T> messageConstructor, SendOptions options, Exe
4276
{
4377
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
4478

45-
await InitializeEndpointIfNecessary(executionContext, functionsLogger, cancellationToken).ConfigureAwait(false);
79+
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
4680
await endpoint.Send(messageConstructor, options, cancellationToken).ConfigureAwait(false);
4781
}
4882

@@ -55,7 +89,7 @@ public async Task Publish(object message, PublishOptions options, ExecutionConte
5589
{
5690
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
5791

58-
await InitializeEndpointIfNecessary(executionContext, functionsLogger, cancellationToken).ConfigureAwait(false);
92+
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
5993
await endpoint.Publish(message, options, cancellationToken).ConfigureAwait(false);
6094
}
6195

@@ -68,7 +102,7 @@ public async Task Publish<T>(Action<T> messageConstructor, PublishOptions option
68102
{
69103
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
70104

71-
await InitializeEndpointIfNecessary(executionContext, functionsLogger, cancellationToken).ConfigureAwait(false);
105+
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
72106
await endpoint.Publish(messageConstructor, options, cancellationToken).ConfigureAwait(false);
73107
}
74108

@@ -81,7 +115,7 @@ public async Task Subscribe(Type eventType, SubscribeOptions options, ExecutionC
81115
{
82116
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
83117

84-
await InitializeEndpointIfNecessary(executionContext, functionsLogger, cancellationToken).ConfigureAwait(false);
118+
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
85119
await endpoint.Subscribe(eventType, options, cancellationToken).ConfigureAwait(false);
86120
}
87121

@@ -94,7 +128,7 @@ public async Task Unsubscribe(Type eventType, UnsubscribeOptions options, Execut
94128
{
95129
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
96130

97-
await InitializeEndpointIfNecessary(executionContext, functionsLogger, cancellationToken).ConfigureAwait(false);
131+
await InitializeEndpointIfNecessary(cancellationToken).ConfigureAwait(false);
98132
await endpoint.Unsubscribe(eventType, options, cancellationToken).ConfigureAwait(false);
99133
}
100134

@@ -103,137 +137,6 @@ public Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogg
103137
return Unsubscribe(eventType, new UnsubscribeOptions(), executionContext, functionsLogger, cancellationToken);
104138
}
105139

106-
public async Task ProcessNonAtomic(
107-
ServiceBusReceivedMessage message,
108-
ExecutionContext executionContext,
109-
ILogger functionsLogger = null,
110-
CancellationToken cancellationToken = default)
111-
{
112-
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
113-
114-
await InitializeEndpointIfNecessary(executionContext, functionsLogger, cancellationToken)
115-
.ConfigureAwait(false);
116-
117-
try
118-
{
119-
var messageContext = CreateMessageContext(message, new TransportTransaction(), false);
120-
121-
await pipeline.PushMessage(messageContext, cancellationToken).ConfigureAwait(false);
122-
123-
}
124-
catch (Exception exception)
125-
{
126-
var errorContext = CreateErrorContext(message, new TransportTransaction(), exception);
127-
128-
var errorHandleResult = await pipeline.PushFailedMessage(errorContext, cancellationToken).ConfigureAwait(false);
129-
130-
if (errorHandleResult == ErrorHandleResult.Handled)
131-
{
132-
return;
133-
}
134-
throw;
135-
}
136-
}
137-
138-
public async Task ProcessAtomic(
139-
ServiceBusReceivedMessage message,
140-
ExecutionContext executionContext,
141-
ServiceBusClient serviceBusClient,
142-
ServiceBusMessageActions messageActions,
143-
ILogger functionsLogger = null,
144-
CancellationToken cancellationToken = default)
145-
{
146-
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);
147-
148-
try
149-
{
150-
await InitializeEndpointIfNecessary(executionContext, functionsLogger, cancellationToken).ConfigureAwait(false);
151-
}
152-
catch (Exception)
153-
{
154-
await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
155-
throw;
156-
}
157-
158-
try
159-
{
160-
using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey, serviceBusClient))
161-
{
162-
var messageContext = CreateMessageContext(message, azureServiceBusTransaction.TransportTransaction, true);
163-
164-
await pipeline.PushMessage(messageContext, cancellationToken).ConfigureAwait(false);
165-
166-
await SafeCompleteMessageAsync(messageActions, message, azureServiceBusTransaction, cancellationToken).ConfigureAwait(false);
167-
azureServiceBusTransaction.Commit();
168-
}
169-
}
170-
catch (Exception exception)
171-
{
172-
ErrorHandleResult result;
173-
using (var azureServiceBusTransaction = CreateTransaction(message.PartitionKey, serviceBusClient))
174-
{
175-
var errorContext = CreateErrorContext(message, azureServiceBusTransaction.TransportTransaction, exception);
176-
177-
result = await pipeline.PushFailedMessage(errorContext, cancellationToken).ConfigureAwait(false);
178-
179-
if (result == ErrorHandleResult.Handled)
180-
{
181-
await SafeCompleteMessageAsync(messageActions, message, azureServiceBusTransaction, cancellationToken).ConfigureAwait(false);
182-
}
183-
184-
azureServiceBusTransaction.Commit();
185-
}
186-
187-
if (result != ErrorHandleResult.Handled)
188-
{
189-
await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false);
190-
}
191-
}
192-
}
193-
194-
ErrorContext CreateErrorContext(ServiceBusReceivedMessage message, TransportTransaction transportTransaction, Exception exception)
195-
{
196-
var errorContext = new ErrorContext(
197-
exception,
198-
message.GetHeaders(),
199-
message.MessageId,
200-
message.Body,
201-
transportTransaction,
202-
message.DeliveryCount,
203-
pipeline.ReceiveAddress,
204-
new ContextBag());
205-
return errorContext;
206-
}
207-
208-
MessageContext CreateMessageContext(ServiceBusReceivedMessage message, TransportTransaction transportTransaction, bool atomic)
209-
{
210-
var contextBag = new ContextBag();
211-
var invocationMode = new FunctionInvocationMode(atomic);
212-
contextBag.Set(invocationMode);
213-
var messageContext = new MessageContext(
214-
message.MessageId,
215-
message.GetHeaders(),
216-
message.Body,
217-
transportTransaction,
218-
pipeline.ReceiveAddress,
219-
contextBag);
220-
return messageContext;
221-
}
222-
223-
static async Task SafeCompleteMessageAsync(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, AzureServiceBusTransportTransaction azureServiceBusTransaction, CancellationToken cancellationToken = default)
224-
{
225-
using var scope = azureServiceBusTransaction.ToTransactionScope();
226-
await messageActions.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false);
227-
scope.Complete();
228-
}
229-
230-
static AzureServiceBusTransportTransaction CreateTransaction(string messagePartitionKey, ServiceBusClient serviceBusClient) =>
231-
new(serviceBusClient, messagePartitionKey, new TransactionOptions
232-
{
233-
IsolationLevel = IsolationLevel.Serializable,
234-
Timeout = TransactionManager.MaximumTimeout
235-
});
236-
237140
internal static readonly string[] AssembliesToExcludeFromScanning = {
238141
"NCrontab.Signed.dll",
239142
"Azure.Core.dll",
@@ -251,19 +154,18 @@ static AzureServiceBusTransportTransaction CreateTransaction(string messageParti
251154
"Azure.Security.KeyVault.Secrets.dll"
252155
};
253156

254-
internal async Task InitializeEndpointIfNecessary(ExecutionContext executionContext, ILogger logger, CancellationToken cancellationToken)
157+
internal async Task InitializeEndpointIfNecessary(CancellationToken cancellationToken)
255158
{
256-
if (pipeline == null)
159+
if (messageProcessor == null)
257160
{
258161
await semaphoreLock.WaitAsync(cancellationToken).ConfigureAwait(false);
259162
try
260163
{
261-
if (pipeline == null)
164+
if (messageProcessor == null)
262165
{
263-
var functionExecutionContext = new FunctionExecutionContext(executionContext, logger);
264-
endpoint = await endpointFactory(functionExecutionContext).ConfigureAwait(false);
166+
endpoint = await endpointFactory().ConfigureAwait(false);
265167

266-
pipeline = serverlessInterceptor.PipelineInvoker;
168+
messageProcessor = serverlessInterceptor.MessageProcessor;
267169
}
268170
}
269171
finally
@@ -273,10 +175,10 @@ internal async Task InitializeEndpointIfNecessary(ExecutionContext executionCont
273175
}
274176
}
275177

276-
PipelineInvoker pipeline;
178+
IMessageProcessor messageProcessor;
277179
IEndpointInstance endpoint;
278180

279-
readonly Func<FunctionExecutionContext, Task<IEndpointInstance>> endpointFactory;
181+
readonly Func<Task<IEndpointInstance>> endpointFactory;
280182
readonly SemaphoreSlim semaphoreLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
281183
readonly ServerlessInterceptor serverlessInterceptor;
282184
}

src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/ServerlessInterceptor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ class ServerlessInterceptor
66

77
public ServerlessInterceptor(ServerlessTransport transport) => this.transport = transport;
88

9-
public PipelineInvoker PipelineInvoker => transport.PipelineInvoker;
9+
public IMessageProcessor MessageProcessor => transport.MessageProcessor;
1010
}
1111
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
namespace NServiceBus.AzureFunctions.InProcess.ServiceBus
2+
{
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Azure.Messaging.ServiceBus;
6+
using Microsoft.Azure.WebJobs.ServiceBus;
7+
8+
interface IMessageProcessor
9+
{
10+
Task ProcessNonAtomic(
11+
ServiceBusReceivedMessage message,
12+
CancellationToken cancellationToken = default);
13+
14+
Task ProcessAtomic(
15+
ServiceBusReceivedMessage message,
16+
ServiceBusClient serviceBusClient,
17+
ServiceBusMessageActions messageActions,
18+
CancellationToken cancellationToken = default);
19+
}
20+
}

src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/PipelineInvoker.cs

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)