forked from forcedotcom/datacloud-customcode-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathentrypoint.py
More file actions
78 lines (64 loc) · 2.17 KB
/
entrypoint.py
File metadata and controls
78 lines (64 loc) · 2.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from pyspark.sql.functions import (
col,
concat_ws,
lit,
)
from datacustomcode.client import Client
from datacustomcode.io.writer.base import WriteMode
def main():
client = Client()
employees = client.read_dlo("Employee__dll").persist()
employees = employees.select("id__c", "manager_id__c", "name__c", "position__c")
employees.show()
employees_with_manager = (
employees.alias("e")
.join(
employees.alias("m"),
col("e.manager_id__c").cast("string") == col("m.id__c").cast("string"),
"left",
)
.select(
col("e.id__c"),
col("e.name__c"),
col("e.position__c"),
col("e.manager_id__c"),
col("m.name__c").alias("manager_name__c"),
)
.persist()
)
hierarchy_df = (
employees_with_manager.filter(col("manager_id__c").isNull())
.withColumn("hierarchy_level__c", lit(1))
.withColumn("management_chain__c", col("name__c"))
.persist()
)
current_level = 1
while True:
ewm = employees_with_manager.alias("ewm")
hdf = hierarchy_df.filter(col("hierarchy_level__c") == current_level).alias(
"hdf"
)
next_level_df = ewm.join(
hdf,
col("ewm.manager_id__c").cast("string") == col("hdf.id__c").cast("string"),
"inner",
).select(
col("ewm.id__c"),
col("ewm.name__c"),
col("ewm.position__c"),
col("ewm.manager_id__c"),
col("ewm.manager_name__c"),
(col("hdf.hierarchy_level__c") + 1).alias("hierarchy_level__c"),
concat_ws(" | ", col("hdf.management_chain__c"), col("ewm.name__c")).alias(
"management_chain__c"
),
)
if next_level_df.isEmpty():
break
hierarchy_df = hierarchy_df.union(next_level_df).persist()
current_level += 1
hierarchy_df = hierarchy_df.orderBy("hierarchy_level__c", "manager_id__c", "id__c")
dlo_name = "Employee_Hierarchy__dll"
client.write_to_dlo(dlo_name, hierarchy_df, WriteMode.APPEND)
if __name__ == "__main__":
main()