Skip to content

Commit badade3

Browse files
Support Azure Service Bus v5 and topic per event topology (#888)
* Bump Azure Service Bus * Add hashing package * Acceptance test support * Adjust prod code * Cleanup and add tests * Slightly smarter configuration forwarding * Cleanup startup * Fix analyzer tests --------- Co-authored-by: danielmarbach <danielmarbach@users.noreply.github.com>
1 parent 5792d72 commit badade3

26 files changed

Lines changed: 315 additions & 331 deletions

src/IntegrationTests.HostV4/Startup.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,5 @@
77
public class Startup : FunctionsStartup
88
{
99
public override void Configure(IFunctionsHostBuilder builder)
10-
{
11-
builder.UseNServiceBus(c => c.AdvancedConfiguration.EnableInstallers());
12-
}
10+
=> builder.UseNServiceBus(c => c.AdvancedConfiguration.EnableInstallers());
1311
}

src/NServiceBus.AzureFunctions.InProcess.Analyzer.Tests/AnalyzerTestFixture.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Text.RegularExpressions;
1010
using System.Threading;
1111
using System.Threading.Tasks;
12+
using Azure.Core;
1213
using Microsoft.CodeAnalysis;
1314
using Microsoft.CodeAnalysis.CSharp;
1415
using Microsoft.CodeAnalysis.Diagnostics;
@@ -111,6 +112,7 @@ static AnalyzerTestFixture()
111112
MetadataReference.CreateFromFile(typeof(System.Linq.Expressions.Expression).GetTypeInfo().Assembly.Location),
112113
MetadataReference.CreateFromFile(Assembly.Load("System.Runtime").Location),
113114
MetadataReference.CreateFromFile(typeof(IFunctionEndpoint).GetTypeInfo().Assembly.Location),
115+
MetadataReference.CreateFromFile(typeof(TokenCredential).GetTypeInfo().Assembly.Location),
114116
MetadataReference.CreateFromFile(typeof(EndpointConfiguration).GetTypeInfo().Assembly.Location),
115117
MetadataReference.CreateFromFile(typeof(AzureServiceBusTransport).GetTypeInfo().Assembly.Location),
116118
];

src/NServiceBus.AzureFunctions.InProcess.Analyzer.Tests/ConfigurationAnalyzerTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class ConfigurationAnalyzerTests : AnalyzerTestFixture<ConfigurationAnaly
1313
[TestCase("OverrideLocalAddress(null)", OverrideLocalAddressNotAllowedId)]
1414
[TestCase("PurgeOnStartup(true)", PurgeOnStartupNotAllowedId)]
1515
[TestCase("SetDiagnosticsPath(null)", SetDiagnosticsPathNotAllowedId)]
16-
[TestCase("UseTransport(new AzureServiceBusTransport(null))", UseTransportNotAllowedId)]
16+
[TestCase("UseTransport(new AzureServiceBusTransport(null, default(TopicTopology)))", UseTransportNotAllowedId)]
1717
public Task DiagnosticIsReportedForEndpointConfiguration(string configuration, string diagnosticId)
1818
{
1919
var source =
@@ -40,7 +40,7 @@ void Bar(ServiceBusTriggeredEndpointConfiguration endpointConfig)
4040
[TestCase("OverrideLocalAddress(null)", OverrideLocalAddressNotAllowedId)]
4141
[TestCase("PurgeOnStartup(true)", PurgeOnStartupNotAllowedId)]
4242
[TestCase("SetDiagnosticsPath(null)", SetDiagnosticsPathNotAllowedId)]
43-
[TestCase("UseTransport(new AzureServiceBusTransport(null))", UseTransportNotAllowedId)]
43+
[TestCase("UseTransport(new AzureServiceBusTransport(null, default(TopicTopology)))", UseTransportNotAllowedId)]
4444
public Task DiagnosticIsNotReportedForOtherEndpointConfiguration(string configuration, string diagnosticId)
4545
{
4646
var source =

src/NServiceBus.AzureFunctions.InProcess.Analyzer.Tests/ConfigurationAnalyzerTestsCSharp8.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
public class ConfigurationAnalyzerTestsCSharp8 : AnalyzerTestFixture<ConfigurationAnalyzer>
1010
{
1111
// HINT: In C# 7 this call is ambiguous with the LearningTransport version as the compiler cannot differentiate method calls via generic type constraints
12-
[TestCase("UseTransport<AzureServiceBusTransport>()", UseTransportNotAllowedId)]
12+
[TestCase("UseTransport<AzureServiceBusTransport>(null)", UseTransportNotAllowedId)]
1313
public Task DiagnosticIsReportedForEndpointConfiguration(string configuration, string diagnosticId)
1414
{
1515
var source =

src/NServiceBus.AzureFunctions.InProcess.ServiceBus/FunctionsHostBuilderExtensions.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
using Microsoft.Extensions.Azure;
1010
using Microsoft.Extensions.Configuration;
1111
using Microsoft.Extensions.DependencyInjection;
12+
using Microsoft.Extensions.Options;
13+
using Transport.AzureServiceBus;
1214

1315
/// <summary>
1416
/// Provides extension methods to configure a <see cref="IFunctionEndpoint"/> using <see cref="IFunctionsHostBuilder"/>.
@@ -112,11 +114,24 @@ static void ConfigureEndpointFactory(IServiceCollection services, FunctionsHostB
112114
services,
113115
Path.Combine(functionsHostBuilderContext.ApplicationRootPath, assemblyDirectoryName));
114116

115-
services.AddSingleton(serviceBusTriggeredEndpointConfiguration);
116-
services.AddSingleton(startableEndpoint);
117-
services.AddSingleton(serverless);
118-
services.AddSingleton<InProcessFunctionEndpoint>();
119-
services.AddSingleton<IFunctionEndpoint>(sp => sp.GetRequiredService<InProcessFunctionEndpoint>());
117+
_ = services.AddSingleton(serviceBusTriggeredEndpointConfiguration);
118+
_ = services.AddSingleton(startableEndpoint);
119+
_ = services.AddSingleton(serverless);
120+
_ = services.AddSingleton<InProcessFunctionEndpoint>();
121+
_ = services.AddSingleton<IFunctionEndpoint>(sp => sp.GetRequiredService<InProcessFunctionEndpoint>());
122+
123+
#pragma warning disable CS0618 // Type or member is obsolete
124+
// Validator is registered here in case the user wants to use the options directly. This makes sure that the options are validated.
125+
// The transport still has to validate the options because options validators are only executed when the options are resolved.
126+
_ = services.AddSingleton<IValidateOptions<MigrationTopologyOptions>, MigrationTopologyOptionsValidator>();
127+
_ = services.AddOptions<MigrationTopologyOptions>()
128+
#pragma warning restore CS0618 // Type or member is obsolete
129+
.BindConfiguration("AzureServiceBus:MigrationTopologyOptions");
130+
131+
// Validator is registered here in case the user wants to use the options directly. This makes sure that the options are validated.
132+
// The transport still has to validate the options because options validators are only executed when the options are resolved.
133+
_ = services.AddSingleton<IValidateOptions<TopologyOptions>, TopologyOptionsValidator>();
134+
_ = services.AddOptions<TopologyOptions>().BindConfiguration("AzureServiceBus:TopologyOptions");
120135
}
121136

122137
static FunctionsHostBuilderContext GetContextInternal(this IFunctionsHostBuilder functionsHostBuilder)

src/NServiceBus.AzureFunctions.InProcess.ServiceBus/NServiceBus.AzureFunctions.InProcess.ServiceBus.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="5.16.4" />
1616
<PackageReference Include="NServiceBus" Version="9.2.6" />
1717
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="4.0.1" />
18-
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="4.2.4" />
18+
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="5.0.0" />
1919
</ItemGroup>
2020

2121
<ItemGroup>

src/NServiceBus.AzureFunctions.InProcess.ServiceBus/Serverless/TransportWrapper/ServerlessTransport.cs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@
22
{
33
using System;
44
using System.Collections.Generic;
5+
using System.Runtime.CompilerServices;
56
using System.Threading;
67
using System.Threading.Tasks;
8+
using Azure.Core;
79
using Microsoft.Extensions.Azure;
810
using Microsoft.Extensions.Configuration;
911
using Microsoft.Extensions.DependencyInjection;
1012
using Transport;
1113

12-
class ServerlessTransport : TransportDefinition
14+
class ServerlessTransport(AzureServiceBusTransport transport, string connectionString, string connectionName) : TransportDefinition(
15+
TransportTransactionMode.ReceiveOnly,
16+
transport.SupportsDelayedDelivery,
17+
transport.SupportsPublishSubscribe,
18+
transport.SupportsTTBR)
1319
{
1420
// HINT: This constant is defined in NServiceBus but is not exposed
1521
const string MainReceiverId = "Main";
@@ -19,22 +25,11 @@ class ServerlessTransport : TransportDefinition
1925

2026
public IServiceProvider ServiceProvider { get; set; }
2127

22-
public ServerlessTransport(TransportExtensions<AzureServiceBusTransport> transportExtensions, string connectionString, string connectionName) : base(
23-
transportExtensions.Transport.TransportTransactionMode,
24-
transportExtensions.Transport.SupportsDelayedDelivery,
25-
transportExtensions.Transport.SupportsPublishSubscribe,
26-
transportExtensions.Transport.SupportsTTBR)
27-
{
28-
this.transportExtensions = transportExtensions;
29-
this.connectionString = connectionString;
30-
this.connectionName = connectionName;
31-
}
32-
3328
public override async Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers,
3429
string[] sendingAddresses,
3530
CancellationToken cancellationToken = default)
3631
{
37-
var configuredTransport = ConfigureTransportConnection(connectionString, connectionName, ServiceProvider.GetRequiredService<IConfiguration>(), transportExtensions,
32+
var configuredTransport = ConfigureTransportConnection(connectionString, connectionName, ServiceProvider.GetRequiredService<IConfiguration>(), transport,
3833
ServiceProvider.GetRequiredService<AzureComponentFactory>());
3934

4035
var baseTransportInfrastructure = await configuredTransport.Initialize(
@@ -58,16 +53,12 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
5853
public override IReadOnlyCollection<TransportTransactionMode> GetSupportedTransactionModes() =>
5954
supportedTransactionModes;
6055

61-
// We are deliberately using the old way of configuring a transport here because it allows us configuring
62-
// the uninitialized transport with a connection string or a fully qualified name and a token provider.
63-
// Once we deprecate the old way we can for example add make the internal ConnectionString, FQDN or
64-
// TokenProvider properties visible to functions or the code base has already moved into a different direction.
6556
static AzureServiceBusTransport ConfigureTransportConnection(string connectionString, string connectionName, IConfiguration configuration,
66-
TransportExtensions<AzureServiceBusTransport> transportExtensions, AzureComponentFactory azureComponentFactory)
57+
AzureServiceBusTransport transport, AzureComponentFactory azureComponentFactory)
6758
{
6859
if (connectionString != null)
6960
{
70-
_ = transportExtensions.ConnectionString(connectionString);
61+
GetConnectionStringRef(transport) = connectionString;
7162
}
7263
else
7364
{
@@ -80,7 +71,7 @@ static AzureServiceBusTransport ConfigureTransportConnection(string connectionSt
8071

8172
if (!string.IsNullOrWhiteSpace(connectionSection.Value))
8273
{
83-
_ = transportExtensions.ConnectionString(connectionSection.Value);
74+
GetConnectionStringRef(transport) = connectionSection.Value;
8475
}
8576
else
8677
{
@@ -91,22 +82,33 @@ static AzureServiceBusTransport ConfigureTransportConnection(string connectionSt
9182
}
9283

9384
var credential = azureComponentFactory.CreateTokenCredential(connectionSection);
94-
_ = transportExtensions.CustomTokenCredential(fullyQualifiedNamespace, credential);
85+
GetFullyQualifiedNamespaceRef(transport) = fullyQualifiedNamespace;
86+
GetTokenCredentialRef(transport) = credential;
9587
}
9688
}
9789

98-
return transportExtensions.Transport;
90+
return transport;
9991
}
10092

93+
// As a temporary workaround we are accessing the properties of the AzureServiceBusTransport using UnsafeAccessor
94+
// This is another blocker to AoT but we are already using the execution assembly in the code base anyway
95+
// Furthermore this allows us to still comply with initializing the transport as late as possible without having to
96+
// expose the properties on the transport itself which would pollute the public API for not much added value.
97+
[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "<ConnectionString>k__BackingField")]
98+
static extern ref string GetConnectionStringRef(AzureServiceBusTransport transport);
99+
100+
[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "<FullyQualifiedNamespace>k__BackingField")]
101+
static extern ref string GetFullyQualifiedNamespaceRef(AzureServiceBusTransport transport);
102+
103+
[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "<TokenCredential>k__BackingField")]
104+
static extern ref TokenCredential GetTokenCredentialRef(AzureServiceBusTransport transport);
105+
101106
internal const string DefaultServiceBusConnectionName = "AzureWebJobsServiceBus";
102107

103108
readonly TransportTransactionMode[] supportedTransactionModes =
104-
{
109+
[
105110
TransportTransactionMode.ReceiveOnly,
106111
TransportTransactionMode.SendsAtomicWithReceive
107-
};
108-
readonly TransportExtensions<AzureServiceBusTransport> transportExtensions;
109-
readonly string connectionString;
110-
readonly string connectionName;
112+
];
111113
}
112114
}

src/NServiceBus.AzureFunctions.InProcess.ServiceBus/ServiceBusTriggeredEndpointConfiguration.cs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
using System.Threading.Tasks;
66
using AzureFunctions.InProcess.ServiceBus;
77
using AzureFunctions.InProcess.ServiceBus.Serverless;
8+
using Configuration.AdvancedExtensibility;
89
using Logging;
910
using Microsoft.Extensions.Configuration;
1011
using Microsoft.Extensions.DependencyInjection;
1112
using Serialization;
1213
using Settings;
14+
using Transport.AzureServiceBus;
1315

1416
/// <summary>
1517
/// Represents a serverless NServiceBus endpoint.
@@ -68,16 +70,28 @@ internal ServiceBusTriggeredEndpointConfiguration(string endpointName, IConfigur
6870
endpointConfiguration.License(licenseText);
6971
}
7072

71-
// We are deliberately using the old way of creating a transport here because it allows us to create an
72-
// uninitialized transport that can later be configured with a connection string or a fully qualified name and
73-
// a token provider. Once we deprecate the old way we can for example add make the internal constructor
74-
// visible to functions or the code base has already moved into a different direction.
75-
transportExtensions = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
76-
// This is required for the Outbox validation to work in NServiceBus 8. It does not affect the actual consistency mode because it is controlled by the functions
77-
// endpoint API (calling ProcessAtomic vs ProcessNonAtomic).
78-
transportExtensions.Transactions(TransportTransactionMode.ReceiveOnly);
79-
Transport = transportExtensions.Transport;
80-
Routing = transportExtensions.Routing();
73+
TopicTopology topicTopology = TopicTopology.Default;
74+
var topologyOptionsSection = configuration?.GetSection("AzureServiceBus:TopologyOptions");
75+
if (topologyOptionsSection.Exists())
76+
{
77+
topicTopology = TopicTopology.FromOptions(topologyOptionsSection.Get<TopologyOptions>());
78+
}
79+
// Migration options take precedence over topology options. We are not doing additional checks here for now.
80+
var migrationOptionsSection = configuration?.GetSection("AzureServiceBus:MigrationTopologyOptions");
81+
if (migrationOptionsSection.Exists())
82+
{
83+
#pragma warning disable CS0618 // Type or member is obsolete
84+
topicTopology = TopicTopology.FromOptions(migrationOptionsSection.Get<MigrationTopologyOptions>());
85+
#pragma warning restore CS0618 // Type or member is obsolete
86+
}
87+
88+
Transport = new AzureServiceBusTransport("TransportWillBeInitializedCorrectlyLater", topicTopology)
89+
{
90+
// This is required for the Outbox validation to work in NServiceBus 8. It does not affect the actual consistency mode because it is controlled by the functions
91+
// endpoint API (calling ProcessAtomic vs ProcessNonAtomic).
92+
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
93+
};
94+
Routing = new RoutingSettings<AzureServiceBusTransport>(endpointConfiguration.GetSettings());
8195

8296
endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
8397

@@ -86,7 +100,7 @@ internal ServiceBusTriggeredEndpointConfiguration(string endpointName, IConfigur
86100

87101
internal ServerlessTransport InitializeTransport()
88102
{
89-
var serverlessTransport = new ServerlessTransport(transportExtensions, connectionString, connectionName);
103+
var serverlessTransport = new ServerlessTransport(Transport, connectionString, connectionName);
90104
AdvancedConfiguration.UseTransport(serverlessTransport);
91105
return serverlessTransport;
92106
}
@@ -117,6 +131,5 @@ public void LogDiagnostics() =>
117131
readonly ServerlessRecoverabilityPolicy recoverabilityPolicy = new ServerlessRecoverabilityPolicy();
118132
readonly string connectionString;
119133
readonly string connectionName;
120-
readonly TransportExtensions<AzureServiceBusTransport> transportExtensions;
121134
}
122135
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
namespace ServiceBus.Tests;
2+
3+
using System;
4+
using System.IO.Hashing;
5+
using System.Text;
6+
7+
public static class AcceptanceTestExtensions
8+
{
9+
public static string ToTopicName(this Type eventType) =>
10+
eventType.FullName.Replace("+", ".").Shorten(maxLength: 260);
11+
12+
// The idea here is to preserve part of the text and append a non-cryptographic hash to it.
13+
// This way, we can have a deterministic and unique names without harming much the readability.
14+
// The chance of collisions should be very low but definitely not zero. We can always switch to
15+
// using more bits in the hash or even back to a cryptographic hash if needed.
16+
public static string Shorten(this string name, int maxLength = 50)
17+
{
18+
if (name.Length <= maxLength)
19+
{
20+
return name;
21+
}
22+
23+
var nameBytes = Encoding.UTF8.GetBytes(name);
24+
var hashValue = XxHash32.Hash(nameBytes);
25+
string hashHex = Convert.ToHexString(hashValue);
26+
27+
int prefixLength = maxLength - hashHex.Length;
28+
29+
if (prefixLength < 0)
30+
{
31+
return hashHex.Length > maxLength
32+
? hashHex[..maxLength] // in case even the hash is too long
33+
: hashHex;
34+
}
35+
36+
string prefix = name[..Math.Min(prefixLength, name.Length)];
37+
return $"{prefix}{hashHex}";
38+
}
39+
}

src/ServiceBus.AcceptanceTests/DefaultEndpoint.cs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceBus.Tests
22
{
33
using System;
4+
using System.Linq;
45
using System.Threading.Tasks;
56
using Microsoft.Extensions.DependencyInjection;
67
using NServiceBus;
@@ -28,23 +29,21 @@ public async Task<EndpointConfiguration> GetConfiguration(
2829
recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
2930
configuration.SendFailedMessagesTo("error");
3031

32+
configuration.EnforcePublisherMetadataRegistration(endpointConfiguration.EndpointName, endpointConfiguration.PublisherMetadata);
33+
3134
var connectionString =
3235
Environment.GetEnvironmentVariable(ServerlessTransport.DefaultServiceBusConnectionName);
3336

34-
var azureServiceBusTransport = new AzureServiceBusTransport(connectionString)
37+
var topology = TopicTopology.Default;
38+
topology.OverrideSubscriptionNameFor(endpointConfiguration.EndpointName, endpointConfiguration.EndpointName.Shorten());
39+
foreach (var eventType in endpointConfiguration.PublisherMetadata.Publishers.SelectMany(p => p.Events))
3540
{
36-
SubscriptionRuleNamingConvention = type =>
37-
{
38-
if (type.FullName.Length <= 50)
39-
{
40-
return type.FullName;
41-
}
42-
43-
return type.Name;
44-
}
45-
};
46-
47-
var transport = configuration.UseTransport(azureServiceBusTransport);
41+
topology.PublishTo(eventType, eventType.ToTopicName());
42+
topology.SubscribeTo(eventType, eventType.ToTopicName());
43+
}
44+
var azureServiceBusTransport = new AzureServiceBusTransport(connectionString, topology);
45+
46+
_ = configuration.UseTransport(azureServiceBusTransport);
4847

4948
configuration.Pipeline.Register("TestIndependenceBehavior", b => new TestIndependenceSkipBehavior(runDescriptor.ScenarioContext), "Skips messages not created during the current test.");
5049

0 commit comments

Comments
 (0)