Skip to content

Commit 1c8fc5c

Browse files
authored
Merge branch 'main' into add_fabric_warehouse
2 parents d9729a1 + 5aacaa8 commit 1c8fc5c

32 files changed

Lines changed: 697 additions & 682 deletions

.circleci/continue_config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ jobs:
246246
echo "export SNOWFLAKE_DATABASE='$TEST_DB_NAME'" >> "$BASH_ENV"
247247
echo "export DATABRICKS_CATALOG='$TEST_DB_NAME'" >> "$BASH_ENV"
248248
echo "export REDSHIFT_DATABASE='$TEST_DB_NAME'" >> "$BASH_ENV"
249+
echo "export GCP_POSTGRES_DATABASE='$TEST_DB_NAME'" >> "$BASH_ENV"
249250
- run:
250251
name: Create test database
251252
command: ./.circleci/manage-test-db.sh << parameters.engine >> "$TEST_DB_NAME" up
@@ -305,6 +306,7 @@ workflows:
305306
- athena
306307
# todo: enable fabric when cicd catalog create/drop implemented in manage-test-db.sh
307308
#- fabric
309+
- gcp-postgres
308310
filters:
309311
branches:
310312
only:

.circleci/manage-test-db.sh

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,32 @@ clickhouse-cloud_init() {
109109
echo "Clickhouse Cloud instance $CLICKHOUSE_CLOUD_HOST is up and running"
110110
}
111111

112+
# GCP Postgres
113+
gcp-postgres_init() {
114+
# Download and start Cloud SQL Proxy
115+
curl -fsSL -o cloud-sql-proxy https://storage.googleapis.com/cloud-sql-connectors/cloud-sql-proxy/v2.18.0/cloud-sql-proxy.linux.amd64
116+
chmod +x cloud-sql-proxy
117+
echo "$GCP_POSTGRES_KEYFILE_JSON" > /tmp/keyfile.json
118+
./cloud-sql-proxy --credentials-file /tmp/keyfile.json $GCP_POSTGRES_INSTANCE_CONNECTION_STRING &
119+
120+
# Wait for proxy to start
121+
sleep 5
122+
}
123+
124+
gcp-postgres_exec() {
125+
PGPASSWORD=$GCP_POSTGRES_PASSWORD psql -h 127.0.0.1 -U $GCP_POSTGRES_USER -c "$1" postgres
126+
}
127+
128+
gcp-postgres_up() {
129+
gcp-postgres_exec "create database $1"
130+
}
131+
132+
gcp-postgres_down() {
133+
gcp-postgres_exec "drop database $1"
134+
}
135+
136+
137+
112138
INIT_FUNC="${ENGINE}_init"
113139
UP_FUNC="${ENGINE}_up"
114140
DOWN_FUNC="${ENGINE}_down"

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,9 @@ athena-test: guard-AWS_ACCESS_KEY_ID guard-AWS_SECRET_ACCESS_KEY guard-ATHENA_S3
176176
fabric-test: guard-FABRIC_HOST guard-FABRIC_CLIENT_ID guard-FABRIC_CLIENT_SECRET guard-FABRIC_DATABASE engine-fabric-install
177177
pytest -n auto -m "fabric" --retries 3 --junitxml=test-results/junit-fabric.xml
178178

179+
gcp-postgres-test: guard-GCP_POSTGRES_INSTANCE_CONNECTION_STRING guard-GCP_POSTGRES_USER guard-GCP_POSTGRES_PASSWORD guard-GCP_POSTGRES_KEYFILE_JSON engine-gcppostgres-install
180+
pytest -n auto -m "gcp_postgres" --retries 3 --junitxml=test-results/junit-gcp-postgres.xml
181+
179182
vscode_settings:
180183
mkdir -p .vscode
181184
cp -r ./tooling/vscode/*.json .vscode/

docs/concepts/models/model_kinds.md

Lines changed: 8 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -935,13 +935,7 @@ SQLMesh achieves this by adding a `valid_from` and `valid_to` column to your mod
935935

936936
Therefore, you can use these models to not only tell you what the latest value is for a given record but also what the values were anytime in the past. Note that maintaining this history does come at a cost of increased storage and compute and this may not be a good fit for sources that change frequently since the history could get very large.
937937

938-
**Note**: SCD Type 2 models support [restatements](../plans.md#restatement-plans) with specific limitations:
939-
940-
- **Full restatements**: The entire table will be recreated from scratch when no start date is specified
941-
- **Partial restatements**: You can specify a start date to restate data from a certain point onwards to the latest interval. The end date will always be set to the latest interval's end date, regardless of what end date you specify
942-
- **Partial sections**: Restatements of specific sections (discontinued ranges) of the table are not supported
943-
944-
Data restatement is disabled for models of this kind by default (`disable_restatement true`). To enable restatements, set `disable_restatement false` in your model configuration.
938+
**Note**: Partial data [restatement](../plans.md#restatement-plans) is not supported for this model kind, which means that the entire table will be recreated from scratch if restated. This may lead to data loss, so data restatement is disabled for models of this kind by default.
945939

946940
There are two ways to tracking changes: By Time (Recommended) or By Column.
947941

@@ -1289,11 +1283,11 @@ This is the most accurate representation of the menu based on the source data pr
12891283

12901284
### Processing Source Table with Historical Data
12911285

1292-
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
1286+
The most common case for SCD Type 2 is creating history for a table that it doesn't have it already.
12931287
In the example of the restaurant menu, the menu just tells you what is offered right now, but you want to know what was offered over time.
12941288
In this case, the default setting of `None` for `batch_size` is the best option.
12951289

1296-
Another use case though is processing a source table that already has history in it.
1290+
Another use case though is processing a source table that already has history in it.
12971291
A common example of this is a "daily snapshot" table that is created by a source system that takes a snapshot of the data at the end of each day.
12981292
If your source table has historical records, like a "daily snapshot" table, then set `batch_size` to `1` to process each interval (each day if a `@daily` cron) in sequential order.
12991293
That way the historical records will be properly captured in the SCD Type 2 table.
@@ -1439,14 +1433,11 @@ GROUP BY
14391433
id
14401434
```
14411435

1442-
### SCD Type 2 Restatements
1436+
### Reset SCD Type 2 Model (clearing history)
14431437

14441438
SCD Type 2 models are designed by default to protect the data that has been captured because it is not possible to recreate the history once it has been lost.
14451439
However, there are cases where you may want to clear the history and start fresh.
1446-
1447-
#### Enabling Restatements
1448-
1449-
To enable restatements for an SCD Type 2 model, set `disable_restatement` to `false` in the model definition:
1440+
For this use use case you will want to start by setting `disable_restatement` to `false` in the model definition.
14501441

14511442
```sql linenums="1" hl_lines="5"
14521443
MODEL (
@@ -1458,39 +1449,16 @@ MODEL (
14581449
);
14591450
```
14601451

1461-
#### Full Restatements (Clearing All History)
1462-
1463-
To clear all history and recreate the entire table from scratch:
1452+
Plan/apply this change to production.
1453+
Then you will want to [restate the model](../plans.md#restatement-plans).
14641454

14651455
```bash
14661456
sqlmesh plan --restate-model db.menu_items
14671457
```
14681458

14691459
!!! warning
14701460

1471-
This will remove **all** historical data on the model which in most situations cannot be recovered.
1472-
1473-
#### Partial Restatements (From a Specific Date)
1474-
1475-
You can restate data from a specific start date onwards. This will:
1476-
- Delete all records with `valid_from >= start_date`
1477-
- Reprocess the data from the start date to the latest interval
1478-
1479-
```bash
1480-
sqlmesh plan --restate-model db.menu_items --start "2023-01-15"
1481-
```
1482-
1483-
!!! note
1484-
1485-
If you specify an end date for SCD Type 2 restatements, it will be ignored and automatically set to the latest interval's end date.
1486-
1487-
```bash
1488-
# This end date will be ignored and set to the latest interval
1489-
sqlmesh plan --restate-model db.menu_items --start "2023-01-15" --end "2023-01-20"
1490-
```
1491-
1492-
1493-
#### Re-enabling Protection
1461+
This will remove the historical data on the model which in most situations cannot be recovered.
14941462

14951463
Once complete you will want to remove `disable_restatement` on the model definition which will set it back to `true` and prevent accidental data loss.
14961464

sqlmesh/core/config/connection.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1216,12 +1216,6 @@ def _validate_auth_method(cls, data: t.Any) -> t.Any:
12161216
password = data.get("password")
12171217
enable_iam_auth = data.get("enable_iam_auth")
12181218

1219-
if password and enable_iam_auth:
1220-
raise ConfigError(
1221-
"Invalid GCP Postgres connection configuration - both password and"
1222-
" enable_iam_auth set. Use password when connecting to a postgres"
1223-
" user and enable_iam_auth 'True' when connecting to an IAM user."
1224-
)
12251219
if not password and not enable_iam_auth:
12261220
raise ConfigError(
12271221
"GCP Postgres connection configuration requires either password set"

sqlmesh/core/engine_adapter/base.py

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,7 +1497,6 @@ def scd_type_2_by_time(
14971497
table_description: t.Optional[str] = None,
14981498
column_descriptions: t.Optional[t.Dict[str, str]] = None,
14991499
truncate: bool = False,
1500-
is_restatement: bool = False,
15011500
**kwargs: t.Any,
15021501
) -> None:
15031502
self._scd_type_2(
@@ -1514,7 +1513,6 @@ def scd_type_2_by_time(
15141513
table_description=table_description,
15151514
column_descriptions=column_descriptions,
15161515
truncate=truncate,
1517-
is_restatement=is_restatement,
15181516
**kwargs,
15191517
)
15201518

@@ -1533,7 +1531,6 @@ def scd_type_2_by_column(
15331531
table_description: t.Optional[str] = None,
15341532
column_descriptions: t.Optional[t.Dict[str, str]] = None,
15351533
truncate: bool = False,
1536-
is_restatement: bool = False,
15371534
**kwargs: t.Any,
15381535
) -> None:
15391536
self._scd_type_2(
@@ -1550,7 +1547,6 @@ def scd_type_2_by_column(
15501547
table_description=table_description,
15511548
column_descriptions=column_descriptions,
15521549
truncate=truncate,
1553-
is_restatement=is_restatement,
15541550
**kwargs,
15551551
)
15561552

@@ -1561,7 +1557,6 @@ def _scd_type_2(
15611557
unique_key: t.Sequence[exp.Expression],
15621558
valid_from_col: exp.Column,
15631559
valid_to_col: exp.Column,
1564-
start: TimeLike,
15651560
execution_time: t.Union[TimeLike, exp.Column],
15661561
invalidate_hard_deletes: bool = True,
15671562
updated_at_col: t.Optional[exp.Column] = None,
@@ -1572,7 +1567,6 @@ def _scd_type_2(
15721567
table_description: t.Optional[str] = None,
15731568
column_descriptions: t.Optional[t.Dict[str, str]] = None,
15741569
truncate: bool = False,
1575-
is_restatement: bool = False,
15761570
**kwargs: t.Any,
15771571
) -> None:
15781572
def remove_managed_columns(
@@ -1757,17 +1751,9 @@ def remove_managed_columns(
17571751
existing_rows_query = exp.select(*table_columns, exp.true().as_("_exists")).from_(
17581752
target_table
17591753
)
1760-
17611754
if truncate:
17621755
existing_rows_query = existing_rows_query.limit(0)
17631756

1764-
# Only set cleanup_ts if is_restatement is True and truncate is False (this to enable full restatement)
1765-
cleanup_ts = (
1766-
to_time_column(start, time_data_type, self.dialect, nullable=True)
1767-
if is_restatement and not truncate
1768-
else None
1769-
)
1770-
17711757
with source_queries[0] as source_query:
17721758
prefixed_columns_to_types = []
17731759
for column in columns_to_types:
@@ -1804,41 +1790,12 @@ def remove_managed_columns(
18041790
# Historical Records that Do Not Change
18051791
.with_(
18061792
"static",
1807-
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_())
1808-
if cleanup_ts is None
1809-
else existing_rows_query.where(
1810-
exp.and_(
1811-
valid_to_col.is_(exp.Null().not_()),
1812-
valid_to_col < cleanup_ts,
1813-
),
1814-
),
1793+
existing_rows_query.where(valid_to_col.is_(exp.Null()).not_()),
18151794
)
18161795
# Latest Records that can be updated
18171796
.with_(
18181797
"latest",
1819-
existing_rows_query.where(valid_to_col.is_(exp.Null()))
1820-
if cleanup_ts is None
1821-
else exp.select(
1822-
*(
1823-
to_time_column(
1824-
exp.null(), time_data_type, self.dialect, nullable=True
1825-
).as_(col)
1826-
if col == valid_to_col.name
1827-
else exp.column(col)
1828-
for col in columns_to_types
1829-
),
1830-
exp.true().as_("_exists"),
1831-
)
1832-
.from_(target_table)
1833-
.where(
1834-
exp.and_(
1835-
valid_from_col <= cleanup_ts,
1836-
exp.or_(
1837-
valid_to_col.is_(exp.null()),
1838-
valid_to_col >= cleanup_ts,
1839-
),
1840-
)
1841-
),
1798+
existing_rows_query.where(valid_to_col.is_(exp.Null())),
18421799
)
18431800
# Deleted records which can be used to determine `valid_from` for undeleted source records
18441801
.with_(

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from google.cloud import bigquery
3434
from google.cloud.bigquery import StandardSqlDataType
3535
from google.cloud.bigquery.client import Client as BigQueryClient
36+
from google.cloud.bigquery.job import QueryJob
3637
from google.cloud.bigquery.job.base import _AsyncJob as BigQueryQueryResult
3738
from google.cloud.bigquery.table import Table as BigQueryTable
3839

@@ -186,6 +187,31 @@ def query_factory() -> Query:
186187
)
187188
]
188189

190+
def close(self) -> t.Any:
191+
# Cancel all pending query jobs across all threads
192+
all_query_jobs = self._connection_pool.get_all_attributes("query_job")
193+
for query_job in all_query_jobs:
194+
if query_job:
195+
try:
196+
if not self._db_call(query_job.done):
197+
self._db_call(query_job.cancel)
198+
logger.debug(
199+
"Cancelled BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s",
200+
query_job.project,
201+
query_job.location,
202+
query_job.job_id,
203+
)
204+
except Exception as ex:
205+
logger.debug(
206+
"Failed to cancel BigQuery job: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s. %s",
207+
query_job.project,
208+
query_job.location,
209+
query_job.job_id,
210+
str(ex),
211+
)
212+
213+
return super().close()
214+
189215
def _begin_session(self, properties: SessionProperties) -> None:
190216
from google.cloud.bigquery import QueryJobConfig
191217

@@ -318,7 +344,10 @@ def create_mapping_schema(
318344
if len(table.parts) == 3 and "." in table.name:
319345
# The client's `get_table` method can't handle paths with >3 identifiers
320346
self.execute(exp.select("*").from_(table).limit(0))
321-
query_results = self._query_job._query_results
347+
query_job = self._query_job
348+
assert query_job is not None
349+
350+
query_results = query_job._query_results
322351
columns = create_mapping_schema(query_results.schema)
323352
else:
324353
bq_table = self._get_table(table)
@@ -717,7 +746,9 @@ def _fetch_native_df(
717746
self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
718747
) -> DF:
719748
self.execute(query, quote_identifiers=quote_identifiers)
720-
return self._query_job.to_dataframe()
749+
query_job = self._query_job
750+
assert query_job is not None
751+
return query_job.to_dataframe()
721752

722753
def _create_column_comments(
723754
self,
@@ -1021,20 +1052,23 @@ def _execute(
10211052
job_config=job_config,
10221053
timeout=self._extra_config.get("job_creation_timeout_seconds"),
10231054
)
1055+
query_job = self._query_job
1056+
assert query_job is not None
10241057

10251058
logger.debug(
10261059
"BigQuery job created: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s",
1027-
self._query_job.project,
1028-
self._query_job.location,
1029-
self._query_job.job_id,
1060+
query_job.project,
1061+
query_job.location,
1062+
query_job.job_id,
10301063
)
10311064

10321065
results = self._db_call(
1033-
self._query_job.result,
1066+
query_job.result,
10341067
timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore
10351068
)
1069+
10361070
self._query_data = iter(results) if results.total_rows else iter([])
1037-
query_results = self._query_job._query_results
1071+
query_results = query_job._query_results
10381072
self.cursor._set_rowcount(query_results)
10391073
self.cursor._set_description(query_results.schema)
10401074

@@ -1198,23 +1232,23 @@ def _query_data(self) -> t.Any:
11981232

11991233
@_query_data.setter
12001234
def _query_data(self, value: t.Any) -> None:
1201-
return self._connection_pool.set_attribute("query_data", value)
1235+
self._connection_pool.set_attribute("query_data", value)
12021236

12031237
@property
1204-
def _query_job(self) -> t.Any:
1238+
def _query_job(self) -> t.Optional[QueryJob]:
12051239
return self._connection_pool.get_attribute("query_job")
12061240

12071241
@_query_job.setter
12081242
def _query_job(self, value: t.Any) -> None:
1209-
return self._connection_pool.set_attribute("query_job", value)
1243+
self._connection_pool.set_attribute("query_job", value)
12101244

12111245
@property
12121246
def _session_id(self) -> t.Any:
12131247
return self._connection_pool.get_attribute("session_id")
12141248

12151249
@_session_id.setter
12161250
def _session_id(self, value: t.Any) -> None:
1217-
return self._connection_pool.set_attribute("session_id", value)
1251+
self._connection_pool.set_attribute("session_id", value)
12181252

12191253

12201254
class _ErrorCounter:

0 commit comments

Comments
 (0)