@@ -152,3 +152,124 @@ async fn activities_subscribe_partial() {
152152 . into_parts ( ) ;
153153 assert_eq ! ( rebase_finished_activity, restored_activity) ;
154154}
155+
156+ #[ tokio:: test]
157+ async fn subscribe_rebaser_requests_work_queue ( ) {
158+ let token = CancellationToken :: new ( ) ;
159+
160+ let tempdir_slash = tempfile:: TempDir :: new_in ( "/tmp" ) . expect ( "cannot create tempdir" ) ;
161+ let tempdir_axl = tempfile:: TempDir :: new_in ( "/tmp" ) . expect ( "cannot create tempdir" ) ;
162+ let tempdir_duff = tempfile:: TempDir :: new_in ( "/tmp" ) . expect ( "cannot create tempdir" ) ;
163+ let db = setup_pg_db ( "subscribe_rebaser_requests_work_queue" ) . await ;
164+
165+ // we need a layerdb for slash, which will be a consumer of our work queue
166+ let ( ldb_slash, _) : ( TestLayerDb , _ ) = LayerDb :: initialize (
167+ tempdir_slash,
168+ db. clone ( ) ,
169+ setup_nats_client ( Some ( "subscribe_rebaser_requests_work_queue" . to_string ( ) ) ) . await ,
170+ token. clone ( ) ,
171+ )
172+ . await
173+ . expect ( "cannot create layerdb" ) ;
174+ ldb_slash. pg_migrate ( ) . await . expect ( "migrate layerdb" ) ;
175+
176+ // we need a layerdb for axl, who will also be a consumer for our work queue
177+ let ( ldb_axl, _) : ( TestLayerDb , _ ) = LayerDb :: initialize (
178+ tempdir_axl,
179+ db. clone ( ) ,
180+ setup_nats_client ( Some ( "subscribe_rebaser_requests_work_queue" . to_string ( ) ) ) . await ,
181+ token. clone ( ) ,
182+ )
183+ . await
184+ . expect ( "cannot create layerdb" ) ;
185+ ldb_axl. pg_migrate ( ) . await . expect ( "migrate layerdb" ) ;
186+
187+ // we need a layerdb for duff, who will also be a consumer for our work queue
188+ let ( ldb_duff, _) : ( TestLayerDb , _ ) = LayerDb :: initialize (
189+ tempdir_duff,
190+ db,
191+ setup_nats_client ( Some ( "subscribe_rebaser_requests_work_queue" . to_string ( ) ) ) . await ,
192+ token. clone ( ) ,
193+ )
194+ . await
195+ . expect ( "cannot create layerdb" ) ;
196+ ldb_duff. pg_migrate ( ) . await . expect ( "migrate layerdb" ) ;
197+
198+ // Subscribe to a work queue of rebase activities on axl and slash
199+ let mut axl_work_queue = ldb_axl
200+ . subscribe_rebaser_requests_work_queue ( )
201+ . await
202+ . expect ( "cannot retrieve a work queue" ) ;
203+ let mut slash_work_queue = ldb_slash
204+ . subscribe_rebaser_requests_work_queue ( )
205+ . await
206+ . expect ( "cannot retrieve a work queue" ) ;
207+
208+ // Send a rebase request activity from duff
209+ let rebase_request = RebaseRequest :: new ( Ulid :: new ( ) , Ulid :: new ( ) , Ulid :: new ( ) ) ;
210+ let tenancy = Tenancy :: new ( WorkspacePk :: new ( ) , ChangeSetId :: new ( ) ) ;
211+ let actor = Actor :: System ;
212+ let metadata = LayeredEventMetadata :: new ( tenancy, actor) ;
213+ let rebase_request_activity = Activity :: rebase ( rebase_request, metadata) ;
214+ // Publish an activity
215+ ldb_duff
216+ . publish_activity ( & rebase_request_activity)
217+ . await
218+ . expect ( "cannot publish activity" ) ;
219+
220+ // Send a rebase finished activity
221+ let rebase_finished = RebaseFinished :: new (
222+ si_layer_cache:: activities:: rebase:: RebaseStatus :: Error {
223+ message : "poop" . to_string ( ) ,
224+ } ,
225+ Ulid :: new ( ) ,
226+ Ulid :: new ( ) ,
227+ ) ;
228+ let tenancy = Tenancy :: new ( WorkspacePk :: new ( ) , ChangeSetId :: new ( ) ) ;
229+ let actor = Actor :: System ;
230+ let metadata = LayeredEventMetadata :: new ( tenancy, actor) ;
231+ let rebase_finished_activity = Activity :: rebase_finished ( rebase_finished, metadata) ;
232+ ldb_duff
233+ . publish_activity ( & rebase_finished_activity)
234+ . await
235+ . expect ( "cannot publish activity" ) ;
236+
237+ let which = tokio:: select! {
238+ maybe_result = slash_work_queue. next( ) => {
239+ let request = maybe_result. expect( "had no messages" ) . expect( "cannot retrieve the ack rebase request" ) ;
240+ assert_eq!( request. payload, rebase_request) ;
241+ request. ack( ) . await . expect( "cannot ack message" ) ;
242+ "slash" . to_string( )
243+ } ,
244+ maybe_result = axl_work_queue. next( ) => {
245+ let request = maybe_result. expect( "had no messages" ) . expect( "cannot retrieve the ack rebase request" ) ;
246+ assert_eq!( request. payload, rebase_request) ;
247+ request. ack( ) . await . expect( "cannot ack message" ) ;
248+ "axl" . to_string( )
249+ } ,
250+ } ;
251+
252+ // This is long enough to confirm that we get once-and-only-once delivery.
253+ // It isn't long enough to confirm that we didn't ack the payload, but that
254+ // is totally fine - we don't need to test that NATS works as directed.
255+ let sleep = tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 100 ) ) ;
256+ tokio:: pin!( sleep) ;
257+
258+ if which == "slash" {
259+ tokio:: select! {
260+ maybe_result = axl_work_queue. next( ) => {
261+ assert!( maybe_result. is_none( ) , "expected no work, but there is some work to do" ) ;
262+ } ,
263+ _ = & mut sleep => {
264+ }
265+ }
266+ } else {
267+ tokio:: select! {
268+ maybe_result = slash_work_queue. next( ) => {
269+ assert!( maybe_result. is_none( ) , "expected no work, but there is some work to do" ) ;
270+ } ,
271+ _ = & mut sleep => {
272+ }
273+ }
274+ }
275+ }
0 commit comments