Skip to content

Commit f0c9e33

Browse files
committed
feat: add batch_size support to scd type 2 kinds
1 parent fe7ed31 commit f0c9e33

5 files changed

Lines changed: 95 additions & 6 deletions

File tree

docs/concepts/models/model_kinds.md

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,12 +1241,13 @@ This is the most accurate representation of the menu based on the source data pr
12411241

12421242
### Shared Configuration Options
12431243

1244-
| Name | Description | Type |
1245-
|-------------------------|-----------------------------------------------------------------------------------------------------------------|---------------------------|
1246-
| unique_key | Unique key used for identifying rows between source and target | List of strings or string |
1247-
| valid_from_name | The name of the `valid_from` column to create in the target table. Default: `valid_from` | string |
1248-
| valid_to_name | The name of the `valid_to` column to create in the target table. Default: `valid_to` | string |
1249-
| invalidate_hard_deletes | If set to `true`, when a record is missing from the source table it will be marked as invalid. Default: `false` | bool |
1244+
| Name | Description | Type |
1245+
|-------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------|
1246+
| unique_key | Unique key used for identifying rows between source and target | List of strings or string |
1247+
| valid_from_name | The name of the `valid_from` column to create in the target table. Default: `valid_from` | string |
1248+
| valid_to_name | The name of the `valid_to` column to create in the target table. Default: `valid_to` | string |
1249+
| invalidate_hard_deletes | If set to `true`, when a record is missing from the source table it will be marked as invalid. Default: `false` | bool |
1250+
| `batch_size` | The maximum number of intervals that can be evaluated in a single backfill task. If this is `None`, all intervals will be processed as part of a single task. See [Processing Source Table with Historical Data](#processing-source-table-with-historical-data) for more info on this use case. (Default: `None`) | int |
12501251

12511252
!!! tip "Important"
12521253

@@ -1278,6 +1279,18 @@ This is the most accurate representation of the menu based on the source data pr
12781279
| columns | The name of the columns to check for changes. `*` to represent that all columns should be checked. | List of strings or string |
12791280
| execution_time_as_valid_from | By default, when the model is first loaded `valid_from` is set to `1970-01-01 00:00:00` and future new rows will have `execution_time` of when the pipeline ran. This changes the behavior to always use `execution_time`. Default: `false` | bool |
12801281

1282+
1283+
### Processing Source Table with Historical Data
1284+
1285+
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
1286+
In the example of the restaurant menu, the menu just tells you what is offered right now, but you want to know what was offered over time.
1287+
In this case, the default setting of `None` for `batch_size` is the best option.
1288+
1289+
Another use case though is processing a source table that already has history in it.
1290+
A common example of this is a "daily snapshot" table that is created by a source system that takes a snapshot of the data at the end of each day.
1291+
If your source table has historical records, like a "daily snapshot" table, then set `batch_size` to `1` to process each interval (each day if a `@daily` cron) in sequential order.
1292+
That way the historical records will be properly captured in the SCD Type 2 table.
1293+
12811294
### Querying SCD Type 2 Models
12821295

12831296
#### Querying the current version of a record

sqlmesh/core/model/kind.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,7 @@ class _SCDType2Kind(_Incremental):
672672
valid_to_name: SQLGlotColumn = Field(exp.column("valid_to"), validate_default=True)
673673
invalidate_hard_deletes: SQLGlotBool = False
674674
time_data_type: exp.DataType = Field(exp.DataType.build("TIMESTAMP"), validate_default=True)
675+
batch_size: t.Optional[SQLGlotPositiveInt] = None
675676

676677
forward_only: SQLGlotBool = True
677678
disable_restatement: SQLGlotBool = True
@@ -711,6 +712,7 @@ def data_hash_values(self) -> t.List[t.Optional[str]]:
711712
gen(self.valid_to_name),
712713
str(self.invalidate_hard_deletes),
713714
gen(self.time_data_type),
715+
gen(self.batch_size) if self.batch_size is not None else None,
714716
]
715717

716718
@property
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Add batch_size to SCD Type 2 models which changes their data hash."""
2+
3+
4+
def migrate(state_sync, **kwargs): # type: ignore
5+
pass

tests/core/test_model.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4403,6 +4403,7 @@ def test_scd_type_2_by_column_overrides():
44034403
forward_only False,
44044404
disable_restatement False,
44054405
invalidate_hard_deletes False,
4406+
batch_size 1
44064407
),
44074408
);
44084409
SELECT
@@ -4428,6 +4429,7 @@ def test_scd_type_2_by_column_overrides():
44284429
assert scd_type_2_model.kind.is_scd_type_2
44294430
assert scd_type_2_model.kind.is_materialized
44304431
assert scd_type_2_model.kind.time_data_type == exp.DataType.build("TIMESTAMPTZ")
4432+
assert scd_type_2_model.kind.batch_size == 1
44314433
assert not scd_type_2_model.kind.invalidate_hard_deletes
44324434
assert not scd_type_2_model.kind.forward_only
44334435
assert not scd_type_2_model.kind.disable_restatement

tests/core/test_scheduler.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
IncrementalByTimeRangeKind,
1414
IncrementalByUniqueKeyKind,
1515
TimeColumn,
16+
SCDType2ByColumnKind,
1617
)
1718
from sqlmesh.core.node import IntervalUnit
1819
from sqlmesh.core.scheduler import (
@@ -810,3 +811,69 @@ def signal_base(batch: DatetimeRanges):
810811
snapshot_b: [(to_timestamp("2023-01-01"), to_timestamp("2023-01-04"))],
811812
snapshot_c: [(to_timestamp("2023-01-01"), to_timestamp("2023-01-02"))],
812813
}
814+
815+
816+
@pytest.mark.parametrize(
817+
"batch_size, expected_batches",
818+
[
819+
(
820+
1,
821+
[
822+
(to_timestamp("2023-01-01"), to_timestamp("2023-01-02")),
823+
(to_timestamp("2023-01-02"), to_timestamp("2023-01-03")),
824+
(to_timestamp("2023-01-03"), to_timestamp("2023-01-04")),
825+
],
826+
),
827+
(
828+
None,
829+
[
830+
(to_timestamp("2023-01-01"), to_timestamp("2023-01-04")),
831+
],
832+
),
833+
],
834+
)
835+
def test_scd_type_2_batch_size(
836+
mocker: MockerFixture,
837+
make_snapshot,
838+
get_batched_missing_intervals,
839+
batch_size: t.Optional[int],
840+
expected_batches: t.List[t.Tuple[int, int]],
841+
):
842+
"""
843+
Test that SCD_TYPE_2_BY_COLUMN models are batched correctly based on batch_size.
844+
With batch_size=1, we expect 3 separate batches for 3 days.
845+
Without a specified batch_size, we expect a single batch for the entire period.
846+
"""
847+
start = to_datetime("2023-01-01")
848+
end = to_datetime("2023-01-04")
849+
850+
# Configure kind params
851+
kind_params = {}
852+
if batch_size is not None:
853+
kind_params["batch_size"] = batch_size
854+
855+
# Create the model and snapshot
856+
model = SqlModel(
857+
name="test_scd_model",
858+
kind=SCDType2ByColumnKind(columns="valid_to", unique_key=["id"], **kind_params),
859+
cron="@daily",
860+
start=start,
861+
query=parse_one("SELECT id, valid_from, valid_to FROM source"),
862+
)
863+
snapshot = make_snapshot(model)
864+
865+
# Setup scheduler
866+
snapshot_evaluator = SnapshotEvaluator(adapters=mocker.MagicMock(), ddl_concurrent_tasks=1)
867+
scheduler = Scheduler(
868+
snapshots=[snapshot],
869+
snapshot_evaluator=snapshot_evaluator,
870+
state_sync=mocker.MagicMock(),
871+
max_workers=2,
872+
default_catalog=None,
873+
)
874+
875+
# Get batches for the time period
876+
batches = get_batched_missing_intervals(scheduler, start, end, end)[snapshot]
877+
878+
# Verify batches match expectations
879+
assert batches == expected_batches

0 commit comments

Comments
 (0)