33
44from sqlmesh import Context
55from sqlmesh .core .model import schema
6+ import concurrent .futures
67
78
89pytestmark = pytest .mark .isolated
1112def test_parallel_load (assert_exp_eq , mocker ):
1213 mocker .patch ("sqlmesh.core.constants.MAX_FORK_WORKERS" , 2 )
1314
14- spy = mocker .spy (schema , "_update_model_schemas" )
15+ spy_update_schemas = mocker .spy (schema , "_update_model_schemas" )
16+ process_pool_executor = mocker .spy (concurrent .futures .ProcessPoolExecutor , "__init__" )
17+ as_completed = mocker .spy (concurrent .futures , "as_completed" )
18+
1519 context = Context (paths = "examples/sushi" )
1620
1721 if hasattr (os , "fork" ):
18- spy .assert_called ()
22+ process_pool_executor .assert_called ()
23+ as_completed .assert_called ()
24+ executor_args = process_pool_executor .call_args
25+ assert executor_args [1 ]["max_workers" ] == 2
1926
27+ assert len (context .models ) == 18
28+ spy_update_schemas .assert_called ()
2029 assert_exp_eq (
2130 context .render ("sushi.customers" ),
2231 """
@@ -41,3 +50,22 @@ def test_parallel_load(assert_exp_eq, mocker):
4150 )
4251
4352 context .plan (no_prompts = True , auto_apply = True )
53+
54+
55+ def test_parallel_load_multi_repo (assert_exp_eq , mocker ):
56+ mocker .patch ("sqlmesh.core.constants.MAX_FORK_WORKERS" , 2 )
57+
58+ process_pool_executor = mocker .spy (concurrent .futures .ProcessPoolExecutor , "__init__" )
59+ context = Context (paths = ["examples/multi/repo_1" , "examples/multi/repo_2" ], gateway = "memory" )
60+
61+ if hasattr (os , "fork" ):
62+ executor_args = process_pool_executor .call_args
63+ assert executor_args [1 ]["max_workers" ] == 2
64+ assert len (context .models ) == 5
65+
66+ assert_exp_eq (
67+ context .render ("memory.bronze.a" ),
68+ 'SELECT 1 AS "col_a", \' b\' AS "col_b", 1 AS "one", \' repo_1\' AS "dup"' ,
69+ )
70+
71+ context .plan (no_prompts = True , auto_apply = True )
0 commit comments