-
Notifications
You must be signed in to change notification settings - Fork 380
Expand file tree
/
Copy pathorders.py
More file actions
73 lines (64 loc) · 1.86 KB
/
orders.py
File metadata and controls
73 lines (64 loc) · 1.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import random
import typing as t
from datetime import datetime, timedelta
import pandas as pd # noqa: TID253
from helper import iter_dates # type: ignore
from sqlmesh import ExecutionContext, model
from sqlmesh.core.model.kind import ModelKindName
from sqlmesh.utils.date import to_date
CUSTOMERS = list(range(0, 100))
WAITERS = list(range(0, 10))
@model(
"sushi.orders",
description="Table of sushi orders.",
kind=dict(
name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="event_date", batch_size=30
),
start="1 week ago",
cron="@daily",
grains=[
"id AS order_id",
],
references=[
"customer_id",
"waiter_id",
],
columns={
"id": "int",
"customer_id": "int",
"waiter_id": "int",
"start_ts": "int",
"end_ts": "int",
"event_date": "date",
},
signals=[("test_signal", {"arg": 1})],
)
def execute(
context: ExecutionContext,
start: datetime,
end: datetime,
execution_time: datetime,
**kwargs: t.Any,
) -> pd.DataFrame:
dfs = []
for dt in iter_dates(start, end):
num_orders = random.randint(10, 30)
start_ts = [
int((dt + timedelta(seconds=random.randint(0, 80000))).timestamp())
for _ in range(num_orders)
]
end_ts = [int(s + random.randint(0, 60 * 60)) for s in start_ts]
dfs.append(
pd.DataFrame(
{
"customer_id": random.choices(CUSTOMERS, k=num_orders),
"waiter_id": random.choices(WAITERS, k=num_orders),
"start_ts": start_ts,
"end_ts": end_ts,
"event_date": to_date(dt),
}
)
.reset_index()
.rename(columns={"index": "id"})
)
return pd.concat(dfs).reset_index(drop=True)