Skip to content

Commit 33b0345

Browse files
committed
Fix: Allow python models to emit DataFrame's with a different column order than what is declared in @model
1 parent c7571fe commit 33b0345

3 files changed

Lines changed: 57 additions & 2 deletions

File tree

sqlmesh/core/engine_adapter/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,12 @@ def _df_to_source_queries(
246246
assert isinstance(df, pd.DataFrame)
247247
num_rows = len(df.index)
248248
batch_size = sys.maxsize if batch_size == 0 else batch_size
249+
250+
# we need to ensure that the order of the columns in columns_to_types columns matches the order of the values
251+
# they can differ if a user specifies columns() on a python model in a different order than what's in the DataFrame's emitted by that model
252+
df = df[list(columns_to_types.keys())]
249253
values = list(df.itertuples(index=False, name=None))
254+
250255
return [
251256
SourceQuery(
252257
query_factory=partial(

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,13 @@ def query_factory() -> Query:
218218
# as later calls.
219219
if not self.table_exists(temp_table):
220220
columns_to_types_create = columns_to_types.copy()
221-
self._convert_df_datetime(df, columns_to_types_create)
221+
ordered_df = df[
222+
list(columns_to_types_create.keys())
223+
] # reorder DataFrame so it matches columns_to_types
224+
self._convert_df_datetime(ordered_df, columns_to_types_create)
222225
self.create_table(temp_table, columns_to_types_create)
223226
rows: t.List[t.Tuple[t.Any, ...]] = list(
224-
df.replace({np.nan: None}).itertuples(index=False, name=None) # type: ignore
227+
ordered_df.replace({np.nan: None}).itertuples(index=False, name=None) # type: ignore
225228
)
226229
conn = self._connection_pool.get()
227230
conn.bulk_copy(temp_table.sql(dialect=self.dialect), rows)

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2732,3 +2732,50 @@ def _use_warehouse_as_state_connection(gateway_name: str, config: Config):
27322732

27332733
# will throw if one of the migrations produces an error, which can happen if we forget to take quoting or normalization into account
27342734
sqlmesh_context.migrate()
2735+
2736+
2737+
def test_python_model_column_order(ctx: TestContext, tmp_path: pathlib.Path):
2738+
if ctx.test_type != "df":
2739+
pytest.skip("python model column order test only needs to be run once per db")
2740+
2741+
test_schema = ctx.add_test_suffix("column_order")
2742+
2743+
(tmp_path / "models").mkdir()
2744+
2745+
# note: this model deliberately defines the columns in the @model definition to be in a different order than what
2746+
# is returned by the DataFrame within the model
2747+
(tmp_path / "models" / "python_model.py").write_text(
2748+
"""
2749+
import pandas as pd
2750+
import typing as t
2751+
from sqlmesh import ExecutionContext, model
2752+
2753+
@model(
2754+
"TEST_SCHEMA.model",
2755+
columns={
2756+
"id": "int",
2757+
"name": "varchar"
2758+
}
2759+
)
2760+
def execute(
2761+
context: ExecutionContext,
2762+
**kwargs: t.Any,
2763+
) -> pd.DataFrame:
2764+
return pd.DataFrame([
2765+
{"name": "foo", "id": 1}
2766+
])
2767+
""".replace("TEST_SCHEMA", test_schema)
2768+
)
2769+
2770+
sqlmesh_ctx = ctx.create_context(path=tmp_path)
2771+
2772+
assert len(sqlmesh_ctx.models) == 1
2773+
2774+
plan = sqlmesh_ctx.plan(auto_apply=True)
2775+
assert len(plan.new_snapshots) == 1
2776+
2777+
engine_adapter = sqlmesh_ctx.engine_adapter
2778+
2779+
df = engine_adapter.fetchdf(f"select * from {test_schema}.model")
2780+
assert len(df) == 1
2781+
assert df.iloc[0].to_dict() == {"id": 1, "name": "foo"}

0 commit comments

Comments
 (0)