11using System ;
22using System . Collections . Generic ;
33using System . Data . SqlClient ;
4+ using System . Text ;
45using System . Threading ;
56using System . Threading . Tasks ;
67using Npgsql ;
@@ -32,10 +33,11 @@ private static readonly IDictionary<string, CreateStreamStore> s_factories
3233
3334 public SqlStreamStoreFactory ( SqlStreamStoreServerConfiguration configuration )
3435 {
35- if ( configuration == null )
36+ if ( configuration == null )
3637 {
3738 throw new ArgumentNullException ( nameof ( configuration ) ) ;
3839 }
40+
3941 _configuration = configuration ;
4042 }
4143
@@ -46,15 +48,12 @@ public Task<IStreamStore> Create(CancellationToken cancellationToken = default)
4648
4749 Log . Information ( $ "Creating stream store for provider '{ provider } '") ;
4850
49- if ( ! s_factories . TryGetValue ( provider , out var factory ) )
51+ if ( ! s_factories . TryGetValue ( provider , out var factory ) )
5052 {
5153 throw new InvalidOperationException ( $ "No provider factory for provider '{ provider } ' found.") ;
5254 }
5355
54- var connectionString = _configuration . ConnectionString ;
55- var schema = _configuration . Schema ;
56-
57- return factory ( connectionString , schema , cancellationToken ) ;
56+ return factory ( _configuration . ConnectionString , _configuration . Schema , cancellationToken ) ;
5857 }
5958
6059 private static Task < IStreamStore > CreateInMemoryStreamStore (
@@ -69,37 +68,45 @@ private static async Task<IStreamStore> CreateMssqlStreamStore(
6968 CancellationToken cancellationToken )
7069 {
7170 var connectionStringBuilder = new SqlConnectionStringBuilder ( connectionString ) ;
72- using ( var connection = new SqlConnection ( new SqlConnectionStringBuilder ( connectionString )
71+ var settings = new MsSqlStreamStoreV3Settings ( connectionString ) ;
72+
73+ if ( schema != null )
7374 {
74- InitialCatalog = "master"
75- } . ConnectionString ) )
75+ settings . Schema = schema ;
76+ }
77+
78+ var streamStore = new MsSqlStreamStoreV3 ( settings ) ;
79+
80+ try
7681 {
77- await connection . OpenAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
82+ using ( var connection = new SqlConnection ( new SqlConnectionStringBuilder ( connectionString )
83+ {
84+ InitialCatalog = "master"
85+ } . ConnectionString ) )
86+ {
87+ await connection . OpenAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
7888
79- using ( var command = new SqlCommand (
80- $@ "
89+ using ( var command = new SqlCommand (
90+ $@ "
8191IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = N'{ connectionStringBuilder . InitialCatalog } ')
8292BEGIN
8393 CREATE DATABASE [{ connectionStringBuilder . InitialCatalog } ]
8494END;
8595" ,
86- connection ) )
87- {
88- await command . ExecuteNonQueryAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
96+ connection ) )
97+ {
98+ await command . ExecuteNonQueryAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
99+ }
89100 }
90- }
91-
92- var settings = new MsSqlStreamStoreV3Settings ( connectionString ) ;
93101
94- if ( schema != null )
102+ await streamStore . CreateSchemaIfNotExists ( cancellationToken ) ;
103+ }
104+ catch ( SqlException ex )
95105 {
96- settings . Schema = schema ;
106+ SchemaCreationFailed ( streamStore . GetSchemaCreationScript , ex ) ;
107+ throw ;
97108 }
98109
99- var streamStore = new MsSqlStreamStoreV3 ( settings ) ;
100-
101- await streamStore . CreateSchemaIfNotExists ( cancellationToken ) ;
102-
103110 return streamStore ;
104111 }
105112
@@ -109,46 +116,67 @@ private static async Task<IStreamStore> CreatePostgresStreamStore(
109116 CancellationToken cancellationToken )
110117 {
111118 var connectionStringBuilder = new NpgsqlConnectionStringBuilder ( connectionString ) ;
119+ var settings = new PostgresStreamStoreSettings ( connectionString ) ;
112120
113- using ( var connection = new NpgsqlConnection ( new NpgsqlConnectionStringBuilder ( connectionString )
114- {
115- Database = null
116- } . ConnectionString ) )
121+ if ( schema != null )
117122 {
118- bool exists ;
119- await connection . OpenAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
123+ settings . Schema = schema ;
124+ }
120125
121- using ( var command = new NpgsqlCommand (
122- $ "SELECT 1 FROM pg_database WHERE datname = '{ connectionStringBuilder . Database } '",
123- connection ) )
124- {
125- exists = await command . ExecuteScalarAsync ( cancellationToken ) . NotOnCapturedContext ( )
126- != null ;
127- }
126+ var streamStore = new PostgresStreamStore ( settings ) ;
128127
129- if ( ! exists )
128+ try
129+ {
130+ using ( var connection = new NpgsqlConnection ( new NpgsqlConnectionStringBuilder ( connectionString )
130131 {
131- using ( var command = new NpgsqlCommand (
132- $ "CREATE DATABASE { connectionStringBuilder . Database } ",
133- connection ) )
132+ Database = null
133+ } . ConnectionString ) )
134+ {
135+ await connection . OpenAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
136+
137+ async Task < bool > DatabaseExists ( )
134138 {
135- await command . ExecuteNonQueryAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
139+ using ( var command = new NpgsqlCommand (
140+ $ "SELECT 1 FROM pg_database WHERE datname = '{ connectionStringBuilder . Database } '",
141+ connection ) )
142+ {
143+ return await command . ExecuteScalarAsync ( cancellationToken ) . NotOnCapturedContext ( )
144+ != null ;
145+ }
136146 }
137- }
138147
139- var settings = new PostgresStreamStoreSettings ( connectionString ) ;
148+ if ( ! await DatabaseExists ( ) )
149+ {
150+ using ( var command = new NpgsqlCommand (
151+ $ "CREATE DATABASE { connectionStringBuilder . Database } ",
152+ connection ) )
153+ {
154+ await command . ExecuteNonQueryAsync ( cancellationToken ) . NotOnCapturedContext ( ) ;
155+ }
156+ }
140157
141- if ( schema != null )
142- {
143- settings . Schema = schema ;
158+ await streamStore . CreateSchemaIfNotExists ( cancellationToken ) ;
144159 }
145-
146- var streamStore = new PostgresStreamStore ( settings ) ;
147-
148- await streamStore . CreateSchemaIfNotExists ( cancellationToken ) ;
149-
150- return streamStore ;
151160 }
161+ catch ( NpgsqlException ex )
162+ {
163+ SchemaCreationFailed ( streamStore . GetSchemaCreationScript , ex ) ;
164+ throw ;
165+ }
166+
167+ return streamStore ;
152168 }
169+
170+ private static void SchemaCreationFailed ( Func < string > getSchemaCreationScript , Exception ex )
171+ => Log . Warning (
172+ new StringBuilder ( )
173+ . Append ( $ "Could not create schema: { ex . Message } ")
174+ . AppendLine ( )
175+ . Append (
176+ "Does your connection string have enough permissions? If not, run the following sql script as a privileged user:" )
177+ . AppendLine ( )
178+ . Append ( getSchemaCreationScript ( ) )
179+ . ToString ( ) ,
180+ ex ) ;
153181 }
154- }
182+ }
0 commit comments