Skip to content

Commit 2b8a844

Browse files
authored
feat!: add cron_tz to allow scheduling according to time zones (#3987)
1 parent 5173a0e commit 2b8a844

12 files changed

Lines changed: 123 additions & 21 deletions

File tree

docs/concepts/models/model_kinds.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ In addition to specifying a time column in the `MODEL` DDL, the model's query mu
318318

319319
A model's `time_column` should be in the [UTC time zone](https://en.wikipedia.org/wiki/Coordinated_Universal_Time) to ensure correct interaction with SQLMesh's scheduler and predefined macro variables.
320320

321-
This requirement aligns with the data engineering best practice of converting datetime/timestamp columns to UTC as soon as they are ingested into the data system and only converting them to local timezones when they exit the system for downstream uses.
321+
This requirement aligns with the data engineering best practice of converting datetime/timestamp columns to UTC as soon as they are ingested into the data system and only converting them to local timezones when they exit the system for downstream uses. The `cron_tz` flag **does not** change this requirement.
322322

323323
Placing all timezone conversion code in the system's first/last transformation models prevents inadvertent timezone-related errors as data flows between models.
324324

@@ -1598,4 +1598,4 @@ Since it's unmanaged, it doesnt support the `batch_size` and `batch_concurrency`
15981598

15991599
Similar to `INCREMENTAL_BY_PARTITION`, attempting to [restate](../plans.md#restatement-plans) an `INCREMENTAL_UNMANAGED` model will trigger a full restatement. That is, the model will be rebuilt from scratch rather than from a time slice you specify.
16001600

1601-
This is because an append-only table is inherently non-idempotent. Restating `INCREMENTAL_UNMANAGED` models may lead to data loss and should be performed with care.
1601+
This is because an append-only table is inherently non-idempotent. Restating `INCREMENTAL_UNMANAGED` models may lead to data loss and should be performed with care.

docs/concepts/models/overview.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,10 @@ Learn more about these properties and their default values in the [model configu
227227
: Tags are one or more labels used to organize your models.
228228

229229
### cron
230-
: Cron is used to schedule when your model processes or refreshes data. It accepts a [cron expression](https://en.wikipedia.org/wiki/Cron) or any of `@hourly`, `@daily`, `@weekly`, or `@monthly`. All times are assumed to be UTC timezone - it is not possible to specify them in a different timezone.
230+
: Cron is used to schedule when your model processes or refreshes data. It accepts a [cron expression](https://en.wikipedia.org/wiki/Cron) or any of `@hourly`, `@daily`, `@weekly`, or `@monthly`. All times are assumed to be UTC timezone by default.
231+
232+
### cron_tz
233+
: Cron timezone is used to specify the timezone of the cron. This is only used for scheduling and does not affect the intervals processed in an incremental model. For example, if a model is `@daily` with cron_tz `America/Los_Angeles`, it will run every day 12AM pacific time, however the `start` and `end` variables passed to the incremental model will represent the UTC date boundaries.
231234

232235
### interval_unit
233236
: Interval unit determines the temporal granularity with which time intervals are calculated for the model.

sqlmesh/core/audit/definition.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,8 @@ def metadata_hash(self) -> str:
270270
*sorted(self.tags),
271271
str(self.sorted_python_env),
272272
self.stamp,
273+
self.cron,
274+
self.cron_tz.key if self.cron_tz else None,
273275
]
274276

275277
query = self.render_audit_query() or self.query

sqlmesh/core/model/definition.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,7 @@ def metadata_hash(self) -> str:
10861086
self.description,
10871087
json.dumps(self.column_descriptions, sort_keys=True),
10881088
self.cron,
1089+
self.cron_tz.key if self.cron_tz else None,
10891090
str(self.start) if self.start else None,
10901091
str(self.end) if self.end else None,
10911092
str(self.retention) if self.retention else None,

sqlmesh/core/node.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import typing as t
4+
import zoneinfo
45
from datetime import datetime
56
from enum import Enum
67
from pathlib import Path
@@ -177,6 +178,7 @@ class _Node(PydanticModel):
177178
the date from the scheduler will be used
178179
cron: A cron string specifying how often the node should be run, leveraging the
179180
[croniter](https://github.com/kiorky/croniter) library.
181+
cron_tz: Time zone for the cron, defaults to utc, [IANA time zones](https://docs.python.org/3/library/zoneinfo.html).
180182
interval_unit: The duration of an interval for the node. By default, it is computed from the cron expression.
181183
tags: A list of tags that can be used to filter nodes.
182184
stamp: An optional arbitrary string sequence used to create new node versions without making
@@ -190,6 +192,7 @@ class _Node(PydanticModel):
190192
start: t.Optional[TimeLike] = None
191193
end: t.Optional[TimeLike] = None
192194
cron: SQLGlotCron = "@daily"
195+
cron_tz: t.Optional[zoneinfo.ZoneInfo] = None
193196
interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None)
194197
tags: t.List[str] = []
195198
stamp: t.Optional[str] = None
@@ -226,6 +229,27 @@ def _name_validator(cls, v: t.Any) -> t.Optional[str]:
226229
return v.meta["sql"]
227230
return str(v)
228231

232+
@field_validator("cron_tz", mode="before")
233+
def _cron_tz_validator(cls, v: t.Any) -> t.Optional[zoneinfo.ZoneInfo]:
234+
if not v or v == "UTC":
235+
return None
236+
237+
v = str_or_exp_to_str(v)
238+
239+
try:
240+
return zoneinfo.ZoneInfo(v)
241+
except Exception as e:
242+
available_timezones = zoneinfo.available_timezones()
243+
244+
if available_timezones:
245+
raise ConfigError(f"{e}. {v} must be in {available_timezones}.")
246+
else:
247+
raise ConfigError(
248+
f"{e}. IANA time zone data is not available on your system. `pip install tzdata` to leverage cron time zones or remove this field which will default to UTC."
249+
)
250+
251+
return None
252+
229253
@field_validator("start", "end", mode="before")
230254
@classmethod
231255
def _date_validator(cls, v: t.Any) -> t.Optional[TimeLike]:
@@ -317,9 +341,9 @@ def metadata_hash(self) -> str:
317341

318342
def croniter(self, value: TimeLike) -> CroniterCache:
319343
if self._croniter is None:
320-
self._croniter = CroniterCache(self.cron, value)
344+
self._croniter = CroniterCache(self.cron, value, tz=self.cron_tz)
321345
else:
322-
self._croniter.curr = to_datetime(value)
346+
self._croniter.curr = to_datetime(value, tz=self.cron_tz)
323347
return self._croniter
324348

325349
def cron_next(self, value: TimeLike, estimate: bool = False) -> datetime:
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""Add 'cron_tz' property to node definition."""
2+
3+
4+
def migrate(state_sync, **kwargs): # type: ignore
5+
pass

sqlmesh/utils/cron.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import typing as t
4-
from datetime import datetime, timedelta
4+
from datetime import datetime, timedelta, tzinfo
55
from functools import lru_cache
66

77
from croniter import croniter
@@ -34,21 +34,22 @@ def interval_seconds(cron: str) -> int:
3434

3535

3636
class CroniterCache:
37-
def __init__(self, cron: str, time: t.Optional[TimeLike] = None):
37+
def __init__(self, cron: str, time: t.Optional[TimeLike] = None, tz: t.Optional[tzinfo] = None):
3838
self.cron = cron
39-
self.curr: datetime = to_datetime(now() if time is None else time)
39+
self.tz = tz
40+
self.curr: datetime = to_datetime(now() if time is None else time, tz=self.tz)
4041
self.interval_seconds = interval_seconds(self.cron)
4142

4243
def get_next(self, estimate: bool = False) -> datetime:
4344
if estimate and self.interval_seconds:
4445
self.curr = self.curr + timedelta(seconds=self.interval_seconds)
4546
else:
46-
self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000)
47+
self.curr = to_datetime(croniter(self.cron, self.curr).get_next() * 1000, tz=self.tz)
4748
return self.curr
4849

4950
def get_prev(self, estimate: bool = False) -> datetime:
5051
if estimate and self.interval_seconds:
5152
self.curr = self.curr - timedelta(seconds=self.interval_seconds)
5253
else:
53-
self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000)
54+
self.curr = to_datetime(croniter(self.cron, self.curr).get_prev() * 1000, tz=self.tz)
5455
return self.curr

sqlmesh/utils/date.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@
55
import typing as t
66
import warnings
77

8-
from pandas.api.types import is_datetime64_any_dtype # type: ignore
9-
10-
from datetime import date, datetime, timedelta, timezone
8+
from datetime import date, datetime, timedelta, timezone, tzinfo
119

1210
import dateparser
1311
import pandas as pd
1412
from dateparser import freshness_date_parser as freshness_date_parser_module
1513
from dateparser.freshness_date_parser import freshness_date_parser
14+
from pandas.api.types import is_datetime64_any_dtype # type: ignore
1615
from sqlglot import exp
1716

1817
from sqlmesh.utils import ttl_cache
@@ -149,19 +148,21 @@ def to_datetime(
149148
value: TimeLike,
150149
relative_base: t.Optional[datetime] = None,
151150
check_categorical_relative_expression: bool = True,
151+
tz: t.Optional[tzinfo] = None,
152152
) -> datetime:
153153
"""Converts a value into a UTC datetime object.
154154
155155
Args:
156156
value: A variety of date formats. If the value is number-like, it is assumed to be millisecond epochs.
157157
relative_base: The datetime to reference for time expressions that are using relative terms.
158158
check_categorical_relative_expression: If True, takes into account the relative expressions that are categorical.
159+
tz: Timezone to convert datetime to, defaults to utc
159160
160161
Raises:
161162
ValueError if value cannot be converted to a datetime.
162163
163164
Returns:
164-
A datetime object with tz utc.
165+
A datetime object with tz (default UTC).
165166
"""
166167
if isinstance(value, datetime):
167168
dt: t.Optional[datetime] = value
@@ -198,9 +199,11 @@ def to_datetime(
198199
if dt is None:
199200
raise ValueError(f"Could not convert `{value}` to datetime.")
200201

202+
tz = tz or UTC
203+
201204
if dt.tzinfo:
202-
return dt if dt.tzinfo == UTC else dt.astimezone(UTC)
203-
return dt.replace(tzinfo=UTC)
205+
return dt if dt.tzinfo == tz else dt.astimezone(tz)
206+
return dt.replace(tzinfo=tz)
204207

205208

206209
def to_date(value: TimeLike, relative_base: t.Optional[datetime] = None) -> date:

sqlmesh/utils/pydantic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import json
44
import typing as t
5+
from datetime import tzinfo
56

67
import pydantic
78
from pydantic import ValidationInfo as ValidationInfo
@@ -72,6 +73,7 @@ class PydanticModel(pydantic.BaseModel):
7273
exp.Tuple: _expression_encoder,
7374
AuditQueryTypes: _expression_encoder, # type: ignore
7475
ModelQueryTypes: _expression_encoder, # type: ignore
76+
tzinfo: lambda tz: tz.key,
7577
},
7678
arbitrary_types_allowed=True,
7779
extra="forbid",

tests/core/test_integration.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4884,6 +4884,67 @@ def test_plan_production_environment_statements(tmp_path: Path):
48844884
assert environment_statements[0].python_env["__sqlmesh__vars__"].payload == "{'var_5': 5}"
48854885

48864886

4887+
@time_machine.travel("2025-03-08 00:00:00 UTC")
4888+
def test_tz(init_and_plan_context):
4889+
context, _ = init_and_plan_context("examples/sushi")
4890+
4891+
model = context.get_model("sushi.waiter_revenue_by_day")
4892+
context.upsert_model(
4893+
SqlModel.parse_obj(
4894+
{**model.dict(), "cron_tz": "America/Los_Angeles", "start": "2025-03-07"}
4895+
)
4896+
)
4897+
4898+
def assert_intervals(plan, intervals):
4899+
assert (
4900+
next(
4901+
intervals.intervals
4902+
for intervals in plan.missing_intervals
4903+
if intervals.snapshot_id.name == model.fqn
4904+
)
4905+
== intervals
4906+
)
4907+
4908+
plan = context.plan_builder("prod", skip_tests=True).build()
4909+
4910+
# we have missing intervals but not waiter_revenue_by_day because it's not midnight pacific yet
4911+
assert plan.missing_intervals
4912+
4913+
with pytest.raises(StopIteration):
4914+
assert_intervals(plan, [])
4915+
4916+
# now we're ready 8AM UTC == midnight PST
4917+
with time_machine.travel("2025-03-08 08:00:00 UTC"):
4918+
plan = context.plan_builder("prod", skip_tests=True).build()
4919+
assert_intervals(plan, [(to_timestamp("2025-03-07"), to_timestamp("2025-03-08"))])
4920+
4921+
with time_machine.travel("2025-03-09 07:00:00 UTC"):
4922+
plan = context.plan_builder("prod", skip_tests=True).build()
4923+
4924+
assert_intervals(
4925+
plan,
4926+
[
4927+
(to_timestamp("2025-03-07"), to_timestamp("2025-03-08")),
4928+
],
4929+
)
4930+
4931+
with time_machine.travel("2025-03-09 08:00:00 UTC"):
4932+
plan = context.plan_builder("prod", skip_tests=True).build()
4933+
4934+
assert_intervals(
4935+
plan,
4936+
[
4937+
(to_timestamp("2025-03-07"), to_timestamp("2025-03-08")),
4938+
(to_timestamp("2025-03-08"), to_timestamp("2025-03-09")),
4939+
],
4940+
)
4941+
4942+
context.apply(plan)
4943+
4944+
plan = context.plan_builder("prod", skip_tests=True).build()
4945+
assert not plan.missing_intervals
4946+
4947+
48874948
def apply_to_environment(
48884949
context: Context,
48894950
environment: str,

0 commit comments

Comments
 (0)