diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 0450827d6e..f919c51182 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -120,7 +120,14 @@ from sqlmesh.utils import UniqueKeyDict, Verbosity from sqlmesh.utils.concurrency import concurrent_apply_to_values from sqlmesh.utils.dag import DAG -from sqlmesh.utils.date import TimeLike, now_ds, to_timestamp, format_tz_datetime, now_timestamp +from sqlmesh.utils.date import ( + TimeLike, + now_ds, + to_timestamp, + format_tz_datetime, + now_timestamp, + now, +) from sqlmesh.utils.errors import ( CircuitBreakerError, ConfigError, @@ -1513,7 +1520,11 @@ def plan_builder( # If no end date is specified, use the max interval end from prod # to prevent unintended evaluation of the entire DAG. default_start, default_end = self._get_plan_default_start_end( - snapshots, max_interval_end_per_model, backfill_models, modified_model_names + snapshots, + max_interval_end_per_model, + backfill_models, + modified_model_names, + execution_time or now(), ) # Refresh snapshot intervals to ensure that they are up to date with values reflected in the max_interval_end_per_model. @@ -2818,6 +2829,7 @@ def _get_plan_default_start_end( max_interval_end_per_model: t.Dict[str, int], backfill_models: t.Optional[t.Set[str]], modified_model_names: t.Set[str], + execution_time: t.Optional[TimeLike] = None, ) -> t.Tuple[t.Optional[int], t.Optional[int]]: if not max_interval_end_per_model: return None, None @@ -2843,6 +2855,12 @@ def _get_plan_default_start_end( ) ), ) + + if execution_time and to_timestamp(default_end) > to_timestamp(execution_time): + # the end date can't be in the future, which can happen if a specific `execution_time` is set and prod intervals + # are newer than it + default_end = to_timestamp(execution_time) + return default_start, default_end def _get_max_interval_end_per_model( diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 27b81f5d74..b2ff0a087c 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -41,6 +41,8 @@ to_datetime, yesterday_ds, to_timestamp, + time_like_to_str, + is_relative, ) from sqlmesh.utils.errors import NoChangesPlanError, PlanError @@ -55,6 +57,7 @@ class PlanBuilder: start: The start time to backfill data. end: The end time to backfill data. execution_time: The date/time time reference to use for execution time. Defaults to now. + If :start or :end are relative time expressions, they are interpreted as relative to the :execution_time apply: The callback to apply the plan. restate_models: A list of models for which the data should be restated for the time range specified in this plan. Note: models defined outside SQLMesh (external) won't be a part @@ -137,7 +140,14 @@ def __init__( self._include_unmodified = include_unmodified self._restate_models = set(restate_models) if restate_models is not None else None self._effective_from = effective_from + + # note: this deliberately doesnt default to now() here. + # There may be an significant delay between the PlanBuilder producing a Plan and the Plan actually being run + # so if execution_time=None is passed to the PlanBuilder, then the resulting Plan should also have execution_time=None + # in order to prevent the Plan that was intended to run "as at now" from having "now" fixed to some time in the past + # ref: https://github.com/TobikoData/sqlmesh/pull/4702#discussion_r2140696156 self._execution_time = execution_time + self._backfill_models = backfill_models self._end = end or default_end self._apply = apply @@ -172,6 +182,26 @@ def is_start_and_end_allowed(self) -> bool: """Indicates whether this plan allows to set the start and end dates.""" return self._is_dev or bool(self._restate_models) + @property + def start(self) -> t.Optional[TimeLike]: + if self._start and is_relative(self._start): + # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' + return to_datetime(self._start, relative_base=to_datetime(self.execution_time)) + return self._start + + @property + def end(self) -> t.Optional[TimeLike]: + if self._end and is_relative(self._end): + # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' + return to_datetime(self._end, relative_base=to_datetime(self.execution_time)) + return self._end + + @cached_property + def execution_time(self) -> TimeLike: + # this is cached to return a stable value from now() in the places where the execution time matters for resolving relative date strings + # during the plan building process + return self._execution_time or now() + def set_start(self, new_start: TimeLike) -> PlanBuilder: self._start = new_start self.override_start = True @@ -256,7 +286,8 @@ def build(self) -> Plan: ) restatements = self._build_restatements( - dag, earliest_interval_start(self._context_diff.snapshots.values()) + dag, + earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time), ) models_to_backfill = self._build_models_to_backfill(dag, restatements) @@ -266,11 +297,17 @@ def build(self) -> Plan: # model should be ignored. interval_end_per_model = None + # this deliberately uses the passed in self._execution_time and not self.execution_time cached property + # the reason is because that there can be a delay between the Plan being built and the Plan being actually run, + # so this ensures that an _execution_time of None can be propagated to the Plan and thus be re-resolved to + # the current timestamp of when the Plan is eventually run + plan_execution_time = self._execution_time + plan = Plan( context_diff=self._context_diff, plan_id=self._plan_id, - provided_start=self._start, - provided_end=self._end, + provided_start=self.start, + provided_end=self.end, is_dev=self._is_dev, skip_backfill=self._skip_backfill, empty_backfill=self._empty_backfill, @@ -289,7 +326,7 @@ def build(self) -> Plan: selected_models_to_backfill=self._backfill_models, models_to_backfill=models_to_backfill, effective_from=self._effective_from, - execution_time=self._execution_time, + execution_time=plan_execution_time, end_bounded=self._end_bounded, ensure_finalized_snapshots=self._ensure_finalized_snapshots, user_provided_flags=self._user_provided_flags, @@ -739,6 +776,18 @@ def _ensure_valid_date_range(self) -> None: "The start and end dates can't be set for a production plan without restatements." ) + if (start := self.start) and (end := self.end): + if to_datetime(start) > to_datetime(end): + raise PlanError( + f"Plan end date: '{time_like_to_str(end)}' must be after the plan start date: '{time_like_to_str(start)}'" + ) + + if end := self.end: + if to_datetime(end) > to_datetime(self.execution_time): + raise PlanError( + f"Plan end date: '{time_like_to_str(end)}' cannot be in the future (execution time: '{time_like_to_str(self.execution_time)}')" + ) + def _ensure_no_forward_only_revert(self) -> None: """Ensures that a previously superseded breaking / non-breaking snapshot is not being used again to replace an existing forward-only snapshot with the same version. diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index c0ef1323f2..f3fb088ebb 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -5,6 +5,7 @@ from datetime import datetime from enum import Enum from functools import cached_property +from pydantic import Field from sqlmesh.core.context_diff import ContextDiff from sqlmesh.core.environment import Environment, EnvironmentNamingInfo, EnvironmentStatements @@ -63,7 +64,7 @@ class Plan(PydanticModel, frozen=True): models_to_backfill: t.Optional[t.Set[str]] = None """All models that should be backfilled as part of this plan.""" effective_from: t.Optional[TimeLike] = None - execution_time: t.Optional[TimeLike] = None + execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time") user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None @@ -80,7 +81,12 @@ def start(self) -> TimeLike: @cached_property def end(self) -> TimeLike: - return self.provided_end or now() + return self.provided_end or self.execution_time + + @cached_property + def execution_time(self) -> TimeLike: + # note: property is cached so that it returns a consistent timestamp for now() + return self.execution_time_ or now() @property def previous_plan_id(self) -> t.Optional[str]: @@ -271,7 +277,7 @@ def to_evaluatable(self) -> EvaluatablePlan: @cached_property def _earliest_interval_start(self) -> datetime: - return earliest_interval_start(self.snapshots.values()) + return earliest_interval_start(self.snapshots.values(), self.execution_time) class EvaluatablePlan(PydanticModel): @@ -345,8 +351,10 @@ def format_intervals(self, unit: t.Optional[IntervalUnit] = None) -> str: return format_intervals(self.merged_intervals, unit) -def earliest_interval_start(snapshots: t.Collection[Snapshot]) -> datetime: - earliest_start = earliest_start_date(snapshots) +def earliest_interval_start( + snapshots: t.Collection[Snapshot], execution_time: t.Optional[TimeLike] = None +) -> datetime: + earliest_start = earliest_start_date(snapshots, relative_to=execution_time) earliest_interval_starts = [s.intervals[0][0] for s in snapshots if s.intervals] return ( min(earliest_start, to_datetime(min(earliest_interval_starts))) diff --git a/sqlmesh/core/snapshot/definition.py b/sqlmesh/core/snapshot/definition.py index 22eed6f3c8..ba422bcdcb 100644 --- a/sqlmesh/core/snapshot/definition.py +++ b/sqlmesh/core/snapshot/definition.py @@ -1995,7 +1995,12 @@ def earliest_start_date( start_date(snapshot, snapshots, cache=cache, relative_to=relative_to) for snapshot in snapshots.values() ) - return yesterday() + + relative_base = None + if relative_to is not None: + relative_base = to_datetime(relative_to) + + return yesterday(relative_base=relative_base) def start_date( diff --git a/sqlmesh/utils/date.py b/sqlmesh/utils/date.py index a241865fda..6c5787470e 100644 --- a/sqlmesh/utils/date.py +++ b/sqlmesh/utils/date.py @@ -87,34 +87,34 @@ def now_ds() -> str: return to_ds(now()) -def yesterday() -> datetime: +def yesterday(relative_base: t.Optional[datetime] = None) -> datetime: """ Yesterday utc datetime. Returns: A datetime object with tz utc representing yesterday's date """ - return to_datetime("yesterday") + return to_datetime("yesterday", relative_base=relative_base) -def yesterday_ds() -> str: +def yesterday_ds(relative_base: t.Optional[datetime] = None) -> str: """ Yesterday utc ds. Returns: Yesterday's ds string. """ - return to_ds("yesterday") + return to_ds("yesterday", relative_base=relative_base) -def yesterday_timestamp() -> int: +def yesterday_timestamp(relative_base: t.Optional[datetime] = None) -> int: """ Yesterday utc timestamp. Returns: UTC epoch millis timestamp of yesterday """ - return to_timestamp(yesterday()) + return to_timestamp(yesterday(relative_base=relative_base)) def to_timestamp( @@ -265,19 +265,19 @@ def date_dict( return kwargs -def to_ds(obj: TimeLike) -> str: +def to_ds(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str: """Converts a TimeLike object into YYYY-MM-DD formatted string.""" - return to_ts(obj)[0:10] + return to_ts(obj, relative_base=relative_base)[0:10] -def to_ts(obj: TimeLike) -> str: +def to_ts(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str: """Converts a TimeLike object into YYYY-MM-DD HH:MM:SS formatted string.""" - return to_datetime(obj).replace(tzinfo=None).isoformat(sep=" ") + return to_datetime(obj, relative_base=relative_base).replace(tzinfo=None).isoformat(sep=" ") -def to_tstz(obj: TimeLike) -> str: +def to_tstz(obj: TimeLike, relative_base: t.Optional[datetime] = None) -> str: """Converts a TimeLike object into YYYY-MM-DD HH:MM:SS+00:00 formatted string.""" - return to_datetime(obj).isoformat(sep=" ") + return to_datetime(obj, relative_base=relative_base).isoformat(sep=" ") def is_date(obj: TimeLike) -> bool: @@ -373,6 +373,16 @@ def is_categorical_relative_expression(expression: str) -> bool: return not any(k in TIME_UNITS for k in grain_kwargs) +def is_relative(value: TimeLike) -> bool: + """ + Tests a TimeLike object to see if it is a relative expression, eg '1 week ago' as opposed to an absolute timestamp + """ + if isinstance(value, str): + return is_categorical_relative_expression(value) + + return False + + def to_time_column( time_column: t.Union[TimeLike, exp.Null], time_column_type: exp.DataType, diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 204e675c66..e88184c2e9 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -49,6 +49,7 @@ make_inclusive_end, now, to_date, + to_datetime, to_timestamp, yesterday_ds, ) @@ -438,6 +439,99 @@ def test_plan_execution_time(): ) +def test_plan_execution_time_start_end(): + context = Context(config=Config()) + context.upsert_model( + load_sql_based_model( + parse( + """ + MODEL( + name db.x, + start '2020-01-01', + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ds + ), + cron '@daily' + ); + + SELECT id, ds FROM (VALUES + ('1', '2020-01-01'), + ('2', '2021-01-01'), + ('3', '2022-01-01'), + ('4', '2023-01-01'), + ('5', '2024-01-01') + ) data(id, ds) + WHERE ds BETWEEN @start_ds AND @end_ds + """ + ) + ) + ) + + # prod plan - no fixed execution time so it defaults to now() and reads all the data + prod_plan = context.plan(auto_apply=True) + + assert len(prod_plan.new_snapshots) == 1 + + context.upsert_model( + load_sql_based_model( + parse( + """ + MODEL( + name db.x, + start '2020-01-01', + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ds + ), + cron '@daily' + ); + + SELECT id, ds, 'changed' as a FROM (VALUES + ('1', '2020-01-01'), + ('2', '2021-01-01'), + ('3', '2022-01-01'), + ('4', '2023-01-01'), + ('5', '2024-01-01') + ) data(id, ds) + WHERE ds BETWEEN @start_ds AND @end_ds + """ + ) + ) + ) + + # dev plan with an execution time in the past and no explicit start/end specified + # the plan end should be bounded to it and not exceed it even though in prod the last interval (used as a default end) + # is newer than the execution time + dev_plan = context.plan("dev", execution_time="2020-01-05") + + assert to_datetime(dev_plan.start) == to_datetime( + "2020-01-01" + ) # default start is the earliest prod interval + assert to_datetime(dev_plan.execution_time) == to_datetime("2020-01-05") + assert to_datetime(dev_plan.end) == to_datetime( + "2020-01-05" + ) # end should not be greater than execution_time + + # same as above but with a relative start + dev_plan = context.plan("dev", start="1 day ago", execution_time="2020-01-05") + + assert to_datetime(dev_plan.start) == to_datetime( + "2020-01-04" + ) # start relative to execution_time + assert to_datetime(dev_plan.execution_time) == to_datetime("2020-01-05") + assert to_datetime(dev_plan.end) == to_datetime( + "2020-01-05" + ) # end should not be greater than execution_time + + # same as above but with a relative start and a relative end + dev_plan = context.plan("dev", start="2 days ago", execution_time="2020-01-05", end="1 day ago") + + assert to_datetime(dev_plan.start) == to_datetime( + "2020-01-03" + ) # start relative to execution_time + assert to_datetime(dev_plan.execution_time) == to_datetime("2020-01-05") + assert to_datetime(dev_plan.end) == to_datetime("2020-01-04") # end relative to execution_time + + def test_override_builtin_audit_blocking_mode(): context = Context(config=Config()) context.upsert_model( diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 540a1384e2..61ac2b4ba6 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -3095,3 +3095,111 @@ def test_user_provided_flags(sushi_context: Context): context_diff, ).build() assert plan_builder.user_provided_flags == None + + +@time_machine.travel(now()) +@pytest.mark.parametrize( + "input,output", + [ + # execution_time, start, end + ( + # no execution time, start or end + (None, None, None), + # execution time defaults to now() + # start defaults to 1 day before execution time + # end defaults to execution_time + (now(), yesterday_ds(), now()), + ), + ( + # fixed execution time, no start, no end + ("2020-01-05", None, None), + # execution time set to 2020-01-05 + # start defaults to 1 day before execution time + # end defaults to execution time + ("2020-01-05", "2020-01-04", "2020-01-05"), + ), + ( + # fixed execution time, relative start, no end + ("2020-01-05", "2 days ago", None), + # execution time set to 2020-01-05 + # start relative to execution time + # end defaults to execution time + ("2020-01-05", "2020-01-03", "2020-01-05"), + ), + ( + # fixed execution time, relative start, relative end + ("2020-01-05", "2 days ago", "1 day ago"), + # execution time set to 2020-01-05 + # start relative to execution time + # end relative to execution time + ("2020-01-05", "2020-01-03", "2020-01-04"), + ), + ( + # fixed execution time, fixed start, fixed end + ("2020-01-05", "2020-01-01", "2020-01-05"), + # fixed dates are all in the valid range + ("2020-01-05", "2020-01-01", "2020-01-05"), + ), + ( + # fixed execution time, fixed start, fixed end + ("2020-01-05", "2020-01-05", "2020-01-01"), + # Error because start is after end + r"Plan end date.*must be after the plan start date", + ), + ( + # fixed execution time, relative start, fixed end beyond fixed execution time + ("2020-01-05", "2 days ago", "2021-01-01"), + # Error because end is set to 2021-01-01 which is after the execution time + r"Plan end date.*cannot be in the future", + ), + ], +) +def test_plan_dates_relative_to_execution_time( + input: t.Tuple[t.Optional[str], ...], + output: t.Union[str, t.Tuple[t.Optional[str], ...]], + make_snapshot: t.Callable, +): + snapshot_a = make_snapshot( + SqlModel(name="a", query=parse_one("select 1, ds"), dialect="duckdb") + ) + + context_diff = ContextDiff( + environment="test_environment", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added={snapshot_a.snapshot_id}, + removed_snapshots={}, + modified_snapshots={}, + snapshots={}, + new_snapshots={snapshot_a.snapshot_id: snapshot_a}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + ) + + input_execution_time, input_start, input_end = input + + def _build_plan() -> Plan: + return PlanBuilder( + context_diff, + start=input_start, + end=input_end, + execution_time=input_execution_time, + is_dev=True, + ).build() + + if isinstance(output, str): + with pytest.raises(PlanError, match=output): + _build_plan() + else: + output_execution_time, output_start, output_end = output + + plan = _build_plan() + assert to_datetime(plan.start) == to_datetime(output_start) + assert to_datetime(plan.end) == to_datetime(output_end) + assert to_datetime(plan.execution_time) == to_datetime(output_execution_time) diff --git a/tests/integrations/github/cicd/test_github_controller.py b/tests/integrations/github/cicd/test_github_controller.py index 208f59b105..8b0a35411a 100644 --- a/tests/integrations/github/cicd/test_github_controller.py +++ b/tests/integrations/github/cicd/test_github_controller.py @@ -19,6 +19,7 @@ GithubCheckStatus, MergeStateStatus, ) +from sqlmesh.utils.date import to_datetime, now from tests.integrations.github.cicd.conftest import MockIssueComment pytestmark = pytest.mark.github @@ -236,6 +237,7 @@ def test_pr_plan(github_client, make_controller): def test_pr_plan_auto_categorization(github_client, make_controller): custom_categorizer_config = CategorizerConfig.all_semi() default_start = "1 week ago" + default_start_absolute = to_datetime(default_start, relative_base=now()) controller = make_controller( "tests/fixtures/github/pull_request_synchronized.json", github_client, @@ -249,7 +251,7 @@ def test_pr_plan_auto_categorization(github_client, make_controller): assert not controller._context.apply.called assert controller._context._run_plan_tests.call_args == call(skip_tests=True) assert controller._pr_plan_builder._categorizer_config == custom_categorizer_config - assert controller.pr_plan.start == default_start + assert controller.pr_plan.start == default_start_absolute def test_prod_plan(github_client, make_controller): diff --git a/tests/utils/test_date.py b/tests/utils/test_date.py index 03fb6e580c..d892817969 100644 --- a/tests/utils/test_date.py +++ b/tests/utils/test_date.py @@ -12,6 +12,7 @@ date_dict, format_tz_datetime, is_categorical_relative_expression, + is_relative, make_inclusive, to_datetime, to_time_column, @@ -324,3 +325,17 @@ def test_format_tz_datetime(): test_datetime = to_datetime("2020-01-01 00:00:00") assert format_tz_datetime(test_datetime) == "2020-01-01 12:00AM UTC" assert format_tz_datetime(test_datetime, format_string=None) == "2020-01-01 00:00:00+00:00" + + +def test_is_relative(): + assert is_relative("1 week ago") + assert is_relative("1 week") + assert is_relative("1 day ago") + assert is_relative("yesterday") + + assert not is_relative("2024-01-01") + assert not is_relative("2024-01-01 01:02:03") + assert not is_relative(to_datetime("2024-01-01 01:02:03")) + assert not is_relative(to_timestamp("2024-01-01 01:02:03")) + assert not is_relative(to_datetime("1 week ago")) + assert not is_relative(to_timestamp("1 day ago"))