33 using System ;
44 using System . Threading ;
55 using System . Threading . Tasks ;
6- using System . Transactions ;
76 using Azure . Messaging . ServiceBus ;
87 using AzureFunctions . InProcess . ServiceBus ;
98 using AzureFunctions . InProcess . ServiceBus . Serverless ;
10- using Extensibility ;
119 using Microsoft . Azure . WebJobs . ServiceBus ;
1210 using Microsoft . Extensions . Logging ;
13- using Transport ;
14- using Transport . AzureServiceBus ;
1511 using ExecutionContext = Microsoft . Azure . WebJobs . ExecutionContext ;
1612
1713 class InProcessFunctionEndpoint : IFunctionEndpoint
@@ -25,6 +21,44 @@ public InProcessFunctionEndpoint(
2521 endpointFactory = ( ) => externallyManagedContainerEndpoint . Start ( serviceProvider ) ;
2622 }
2723
24+ public async Task ProcessAtomic (
25+ ServiceBusReceivedMessage message ,
26+ ExecutionContext executionContext ,
27+ ServiceBusClient serviceBusClient ,
28+ ServiceBusMessageActions messageActions ,
29+ ILogger functionsLogger = null ,
30+ CancellationToken cancellationToken = default )
31+ {
32+ FunctionsLoggerFactory . Instance . SetCurrentLogger ( functionsLogger ) ;
33+
34+ try
35+ {
36+ await InitializeEndpointIfNecessary ( cancellationToken ) . ConfigureAwait ( false ) ;
37+ }
38+ catch ( Exception )
39+ {
40+ await messageActions . AbandonMessageAsync ( message , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
41+ throw ;
42+ }
43+
44+ await messageProcessor . ProcessAtomic ( message , serviceBusClient , messageActions , cancellationToken )
45+ . ConfigureAwait ( false ) ;
46+ }
47+
48+ public async Task ProcessNonAtomic (
49+ ServiceBusReceivedMessage message ,
50+ ExecutionContext executionContext ,
51+ ILogger functionsLogger = null ,
52+ CancellationToken cancellationToken = default )
53+ {
54+ FunctionsLoggerFactory . Instance . SetCurrentLogger ( functionsLogger ) ;
55+
56+ await InitializeEndpointIfNecessary ( cancellationToken ) . ConfigureAwait ( false ) ;
57+
58+ await messageProcessor . ProcessNonAtomic ( message , cancellationToken )
59+ . ConfigureAwait ( false ) ;
60+ }
61+
2862 public async Task Send ( object message , SendOptions options , ExecutionContext executionContext , ILogger functionsLogger = null , CancellationToken cancellationToken = default )
2963 {
3064 FunctionsLoggerFactory . Instance . SetCurrentLogger ( functionsLogger ) ;
@@ -103,137 +137,6 @@ public Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogg
103137 return Unsubscribe ( eventType , new UnsubscribeOptions ( ) , executionContext , functionsLogger , cancellationToken ) ;
104138 }
105139
106- public async Task ProcessNonAtomic (
107- ServiceBusReceivedMessage message ,
108- ExecutionContext executionContext ,
109- ILogger functionsLogger = null ,
110- CancellationToken cancellationToken = default )
111- {
112- FunctionsLoggerFactory . Instance . SetCurrentLogger ( functionsLogger ) ;
113-
114- await InitializeEndpointIfNecessary ( cancellationToken )
115- . ConfigureAwait ( false ) ;
116-
117- try
118- {
119- var messageContext = CreateMessageContext ( message , new TransportTransaction ( ) , false ) ;
120-
121- await pipeline . PushMessage ( messageContext , cancellationToken ) . ConfigureAwait ( false ) ;
122-
123- }
124- catch ( Exception exception )
125- {
126- var errorContext = CreateErrorContext ( message , new TransportTransaction ( ) , exception ) ;
127-
128- var errorHandleResult = await pipeline . PushFailedMessage ( errorContext , cancellationToken ) . ConfigureAwait ( false ) ;
129-
130- if ( errorHandleResult == ErrorHandleResult . Handled )
131- {
132- return ;
133- }
134- throw ;
135- }
136- }
137-
138- public async Task ProcessAtomic (
139- ServiceBusReceivedMessage message ,
140- ExecutionContext executionContext ,
141- ServiceBusClient serviceBusClient ,
142- ServiceBusMessageActions messageActions ,
143- ILogger functionsLogger = null ,
144- CancellationToken cancellationToken = default )
145- {
146- FunctionsLoggerFactory . Instance . SetCurrentLogger ( functionsLogger ) ;
147-
148- try
149- {
150- await InitializeEndpointIfNecessary ( cancellationToken ) . ConfigureAwait ( false ) ;
151- }
152- catch ( Exception )
153- {
154- await messageActions . AbandonMessageAsync ( message , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
155- throw ;
156- }
157-
158- try
159- {
160- using ( var azureServiceBusTransaction = CreateTransaction ( message . PartitionKey , serviceBusClient ) )
161- {
162- var messageContext = CreateMessageContext ( message , azureServiceBusTransaction . TransportTransaction , true ) ;
163-
164- await pipeline . PushMessage ( messageContext , cancellationToken ) . ConfigureAwait ( false ) ;
165-
166- await SafeCompleteMessageAsync ( messageActions , message , azureServiceBusTransaction , cancellationToken ) . ConfigureAwait ( false ) ;
167- azureServiceBusTransaction . Commit ( ) ;
168- }
169- }
170- catch ( Exception exception )
171- {
172- ErrorHandleResult result ;
173- using ( var azureServiceBusTransaction = CreateTransaction ( message . PartitionKey , serviceBusClient ) )
174- {
175- var errorContext = CreateErrorContext ( message , azureServiceBusTransaction . TransportTransaction , exception ) ;
176-
177- result = await pipeline . PushFailedMessage ( errorContext , cancellationToken ) . ConfigureAwait ( false ) ;
178-
179- if ( result == ErrorHandleResult . Handled )
180- {
181- await SafeCompleteMessageAsync ( messageActions , message , azureServiceBusTransaction , cancellationToken ) . ConfigureAwait ( false ) ;
182- }
183-
184- azureServiceBusTransaction . Commit ( ) ;
185- }
186-
187- if ( result != ErrorHandleResult . Handled )
188- {
189- await messageActions . AbandonMessageAsync ( message , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) ;
190- }
191- }
192- }
193-
194- ErrorContext CreateErrorContext ( ServiceBusReceivedMessage message , TransportTransaction transportTransaction , Exception exception )
195- {
196- var errorContext = new ErrorContext (
197- exception ,
198- message . GetHeaders ( ) ,
199- message . MessageId ,
200- message . Body ,
201- transportTransaction ,
202- message . DeliveryCount ,
203- pipeline . ReceiveAddress ,
204- new ContextBag ( ) ) ;
205- return errorContext ;
206- }
207-
208- MessageContext CreateMessageContext ( ServiceBusReceivedMessage message , TransportTransaction transportTransaction , bool atomic )
209- {
210- var contextBag = new ContextBag ( ) ;
211- var invocationMode = new FunctionInvocationMode ( atomic ) ;
212- contextBag . Set ( invocationMode ) ;
213- var messageContext = new MessageContext (
214- message . MessageId ,
215- message . GetHeaders ( ) ,
216- message . Body ,
217- transportTransaction ,
218- pipeline . ReceiveAddress ,
219- contextBag ) ;
220- return messageContext ;
221- }
222-
223- static async Task SafeCompleteMessageAsync ( ServiceBusMessageActions messageActions , ServiceBusReceivedMessage message , AzureServiceBusTransportTransaction azureServiceBusTransaction , CancellationToken cancellationToken = default )
224- {
225- using var scope = azureServiceBusTransaction . ToTransactionScope ( ) ;
226- await messageActions . CompleteMessageAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
227- scope . Complete ( ) ;
228- }
229-
230- static AzureServiceBusTransportTransaction CreateTransaction ( string messagePartitionKey , ServiceBusClient serviceBusClient ) =>
231- new ( serviceBusClient , messagePartitionKey , new TransactionOptions
232- {
233- IsolationLevel = IsolationLevel . Serializable ,
234- Timeout = TransactionManager . MaximumTimeout
235- } ) ;
236-
237140 internal static readonly string [ ] AssembliesToExcludeFromScanning = {
238141 "NCrontab.Signed.dll" ,
239142 "Azure.Core.dll" ,
@@ -253,16 +156,16 @@ static AzureServiceBusTransportTransaction CreateTransaction(string messageParti
253156
254157 internal async Task InitializeEndpointIfNecessary ( CancellationToken cancellationToken )
255158 {
256- if ( pipeline == null )
159+ if ( messageProcessor == null )
257160 {
258161 await semaphoreLock . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
259162 try
260163 {
261- if ( pipeline == null )
164+ if ( messageProcessor == null )
262165 {
263166 endpoint = await endpointFactory ( ) . ConfigureAwait ( false ) ;
264167
265- pipeline = serverlessInterceptor . PipelineInvoker ;
168+ messageProcessor = serverlessInterceptor . MessageProcessor ;
266169 }
267170 }
268171 finally
@@ -272,7 +175,7 @@ internal async Task InitializeEndpointIfNecessary(CancellationToken cancellation
272175 }
273176 }
274177
275- PipelineInvoker pipeline ;
178+ IMessageProcessor messageProcessor ;
276179 IEndpointInstance endpoint ;
277180
278181 readonly Func < Task < IEndpointInstance > > endpointFactory ;
0 commit comments