|
| 1 | +from pyspark.sql.functions import col, lit, concat_ws |
| 2 | + |
| 3 | +from datacustomcode.client import Client |
| 4 | +from datacustomcode.io.writer.base import WriteMode |
| 5 | + |
| 6 | + |
| 7 | +def main(): |
| 8 | + client = Client() |
| 9 | + |
| 10 | + employees = client.read_dlo("Employee__dll").persist() |
| 11 | + employees = employees.select( |
| 12 | + "id__c", |
| 13 | + "manager_id__c", |
| 14 | + "name__c", |
| 15 | + "position__c" |
| 16 | + ) |
| 17 | + employees.show() |
| 18 | + employees_with_manager = employees.alias("e") \ |
| 19 | + .join(employees.alias("m"), col("e.manager_id__c").cast("string") == col("m.id__c").cast("string"), "left") \ |
| 20 | + .select( |
| 21 | + col("e.id__c"), |
| 22 | + col("e.name__c"), |
| 23 | + col("e.position__c"), |
| 24 | + col("e.manager_id__c"), |
| 25 | + col("m.name__c").alias("manager_name__c") |
| 26 | + ).persist() |
| 27 | + |
| 28 | + hierarchy_df = employees_with_manager.filter(col("manager_id__c").isNull()) \ |
| 29 | + .withColumn("hierarchy_level__c", lit(1)) \ |
| 30 | + .withColumn("management_chain__c", col("name__c")).persist() |
| 31 | + |
| 32 | + current_level = 1 |
| 33 | + |
| 34 | + while True: |
| 35 | + ewm = employees_with_manager.alias("ewm") |
| 36 | + hdf = hierarchy_df.filter(col("hierarchy_level__c") == current_level).alias("hdf") |
| 37 | + |
| 38 | + next_level_df = (ewm |
| 39 | + .join(hdf, col("ewm.manager_id__c").cast("string") == col("hdf.id__c").cast("string") , "inner") |
| 40 | + .select( |
| 41 | + col("ewm.id__c"), |
| 42 | + col("ewm.name__c"), |
| 43 | + col("ewm.position__c"), |
| 44 | + col("ewm.manager_id__c"), |
| 45 | + col("ewm.manager_name__c"), |
| 46 | + (col("hdf.hierarchy_level__c") + 1).alias("hierarchy_level__c"), |
| 47 | + concat_ws(" | ", col("hdf.management_chain__c"), col("ewm.name__c")).alias("management_chain__c") |
| 48 | + ) |
| 49 | + ) |
| 50 | + |
| 51 | + if next_level_df.isEmpty(): |
| 52 | + break |
| 53 | + |
| 54 | + hierarchy_df = hierarchy_df.union(next_level_df).persist() |
| 55 | + current_level += 1 |
| 56 | + |
| 57 | + hierarchy_df = hierarchy_df.orderBy("hierarchy_level__c", "manager_id__c", "id__c") |
| 58 | + |
| 59 | + dlo_name = "Employee_Hierarchy__dll" |
| 60 | + client.write_to_dlo(dlo_name, hierarchy_df, WriteMode.APPEND) |
| 61 | + |
| 62 | + |
| 63 | +if __name__ == "__main__": |
| 64 | + main() |
0 commit comments