Skip to content

Commit fda5621

Browse files
committed
Add a very very preliminary tool to generate code that recreates a project flow
1 parent e640acc commit fda5621

2 files changed

Lines changed: 243 additions & 0 deletions

File tree

dataikuapi/dss/tools/__init__.py

Whitespace-only changes.

dataikuapi/dss/tools/codegen.py

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
import json, copy, re
2+
from dataikuapi.dss.recipe import *
3+
from dataikuapi.dss.dataset import *
4+
5+
class _IndentContext(object):
6+
def __init__(self, flow_code_generator):
7+
self.flow_code_generator = flow_code_generator
8+
9+
def __enter__(self):
10+
self.flow_code_generator.cur_indent += 1
11+
12+
def __exit__(self, b, c, d):
13+
self.flow_code_generator.cur_indent -= 1
14+
15+
def output_to_code(obj):
16+
if isinstance(obj, basestring):
17+
return "\"%s\"" % obj
18+
else:
19+
return obj
20+
21+
def slugify(name):
22+
return re.sub("[^A-Za-z0-9_]", "_", name)
23+
24+
class FlowCodeGenerator(object):
25+
def __init__(self):
26+
self.code = ""
27+
self.cur_indent = 0
28+
29+
def set_options(self):
30+
pass
31+
32+
33+
def generate_code_for_dataset(self, dataset):
34+
entrypoint_name = "create_dataset_%s" % dataset.dataset_name
35+
self._generate_code_for_dataset(dataset, entrypoint_name)
36+
return self.code
37+
38+
def generate_code_for_recipe(self, recipe):
39+
entrypoint_name = "create_recipe_%s" % recipe.recipe_name
40+
self._generate_code_for_recipe(recipe, entrypoint_name)
41+
return self.code
42+
43+
def generate_code_for_project(self, project, entrypoint_name = None):
44+
self.gen("import json")
45+
self.gen("")
46+
47+
if entrypoint_name is None:
48+
entrypoint_name = "create_flow_for_project"
49+
50+
self.gen("def %s(project):" % entrypoint_name)
51+
52+
flow_graph = project.get_flow().get_graph()
53+
flow_items = flow_graph.get_items_in_traversal_order(as_type="object")
54+
55+
entrypoints_to_call = []
56+
with _IndentContext(self) as ic:
57+
for item in flow_items:
58+
if isinstance(item, DSSDataset):
59+
entrypoint_name = "create_dataset_%s" % slugify(item.dataset_name)
60+
self._generate_code_for_dataset(item, entrypoint_name)
61+
else:
62+
entrypoint_name = "create_recipe%s" % slugify(item.recipe_name)
63+
self._generate_code_for_recipe(item, entrypoint_name)
64+
entrypoints_to_call.append(entrypoint_name)
65+
66+
self.gen("")
67+
self.gen("# Actual creation of the Flow from the individual functions")
68+
for ep in entrypoints_to_call:
69+
self.gen("%s(project)" % ep)
70+
71+
return self.code
72+
73+
def _generate_code_for_dataset(self, dataset, entrypoint_name):
74+
self.gen("def %s(project):" % entrypoint_name)
75+
settings = dataset.get_settings()
76+
raw = settings.get_raw()
77+
78+
templates = dataset.client._perform_json("GET", "/projects/X/datasets/templates")
79+
80+
do_not_copy = [
81+
"projectKey",
82+
"name",
83+
"type",
84+
"versionTag",
85+
"creationTag",
86+
"schema"
87+
]
88+
89+
self.gen(" # Base dataset params")
90+
91+
if raw["type"] == "UploadedFiles":
92+
self.gen(" dataset = project.create_upload_dataset(\"%s\")" % dataset.dataset_name)
93+
self.gen(" settings = dataset.get_settings()")
94+
self.lf()
95+
96+
self.codegen_object_fields_explicit(settings.get_raw(), templates["dataset"], ["params"], "settings.get_raw()")
97+
do_not_copy.append("params")
98+
99+
elif raw["type"] in DSSDataset.FS_TYPES:
100+
self.gen(" dataset = project.create_dataset(\"%s\", \"%s\")" % (dataset.dataset_name, raw["type"]))
101+
self.gen(" settings = dataset.get_settings()")
102+
self.lf()
103+
104+
srcp = raw["params"]
105+
self.gen(" settings.set_connection_and_path(%s, %s)" % \
106+
(output_to_code(srcp.get("connection")), output_to_code(srcp.get("path"))))
107+
self.codegen_object_fields(srcp, templates["abstractFSConfig"],
108+
["connection", "path"], "settings.get_raw_params()")
109+
do_not_copy.append("params")
110+
elif raw["type"] in DSSDataset.SQL_TYPES:
111+
self.gen(" dataset = project.create_dataset(\"%s\", \"%s\")" % (dataset.dataset_name, raw["type"]))
112+
self.gen(" settings = dataset.get_settings()")
113+
self.lf()
114+
115+
srcp = raw["params"]
116+
if srcp.get("mode", None) == "table":
117+
self.gen(" settings.set_table(%s, %s, %s)" % \
118+
(output_to_code(srcp.get("connection")), output_to_code(srcp.get("schema")),
119+
output_to_code(srcp.get("table"))))
120+
121+
self.codegen_object_fields(srcp, templates["abstractSQLConfig"],
122+
["mode", "connection", "schema", "table"], "settings.get_raw_params()")
123+
else:
124+
self.gen(" dataset = project.create_dataset(\"%s\", \"%s\")" % (dataset.dataset_name, raw["type"]))
125+
self.gen(" settings = dataset.get_settings()")
126+
127+
self.codegen_object_fields_explicit(settings.get_raw(), templates["dataset"], ["params"], "settings.get_raw()")
128+
do_not_copy.append("params")
129+
130+
# Copy of format params
131+
if "formatType" in raw:
132+
self.lf()
133+
self.gen(" # Format params")
134+
135+
handled = False
136+
137+
if raw["formatType"] == "csv":
138+
csv_format = raw["formatParams"]
139+
self.gen(" settings.set_csv_format(separator=\"%s\", style=\"%s\", skip_rows_before=%d, header_row=%s, skip_rows_after=%d)"%\
140+
(csv_format.get("separator", None), csv_format["style"], csv_format["skipRowsBeforeHeader"],\
141+
csv_format["parseHeaderRow"], csv_format["skipRowsAfterHeader"]))
142+
143+
self.codegen_object_fields(csv_format, templates["csvFormat"],
144+
["separator", "style", "skipRowsBeforeHeader",
145+
"parseHeaderRow", "skipRowsAfterHeader", "probableNumberOfRecords"],
146+
"settings.get_raw_format_params()")
147+
else:
148+
self.codegen_object_fields_explicit(settings.get_raw(), templates["dataset"], ["formatType", "formatParams"],
149+
"settings.get_raw()")
150+
do_not_copy.extend(["formatType", "formatParams"])
151+
152+
self.lf()
153+
self.gen(" # Schema")
154+
for column in settings.get_raw()["schema"]["columns"]:
155+
self.gen(" settings.add_raw_schema_column(%s)" % column)
156+
157+
self.lf()
158+
self.gen(" # Other dataset params")
159+
self.codegen_object_fields(settings.get_raw(), templates["dataset"], do_not_copy, "settings.get_raw()")
160+
161+
self.lf()
162+
self.gen(" settings.save()")
163+
self.lf()
164+
165+
def _generate_code_for_recipe(self, recipe, entrypoint_name):
166+
self.gen("def %s(project):" % entrypoint_name)
167+
settings = recipe.get_settings()
168+
raw = settings.get_recipe_raw_definition()
169+
170+
template = {"tags":[], "optionalDependencies": False, "redispatchPartitioning": False,
171+
"maxRunningActivities": 0, "neverRecomputeExistingPartitions" : False,
172+
"customFields":{}, "customMeta": {"kv":{}}, "checklists" : {"checklists":[]}}
173+
174+
do_not_copy = [
175+
"projectKey",
176+
"name",
177+
"type",
178+
"versionTag",
179+
"creationTag",
180+
"inputs",
181+
"outputs"
182+
]
183+
184+
self.gen(" # Create the recipe as a blank recipe")
185+
self.gen(" builder = project.new_recipe(\"%s\", \"%s\")" % (raw["type"], recipe.recipe_name))
186+
self.gen(" builder.set_raw_mode()")
187+
self.gen(" recipe = builder.create()")
188+
self.lf()
189+
self.gen(" # Setup the recipe")
190+
self.gen(" settings = recipe.get_settings()" )
191+
192+
self.lf()
193+
self.gen(" # Recipe inputs/outputs")
194+
self.codegen_object_fields_explicit(raw, template, ["inputs", "outputs"], "settings.get_recipe_raw_definition()")
195+
self.lf()
196+
197+
self.gen(" # Recipe payload")
198+
if isinstance(settings, CodeRecipeSettings):
199+
code = settings.get_code()
200+
self.gen(" settings.set_code(\"\"\"%s\n\"\"\")" % code)
201+
else:
202+
self.gen(" # No specific handling, simply copy payload")
203+
self.gen(" settings.set_payload(\"\"\"%s\n\"\"\")" % settings.get_payload())
204+
205+
self.lf()
206+
self.gen(" # Other parameters")
207+
self.codegen_object_fields(raw, template, do_not_copy, "settings.get_recipe_raw_definition()")
208+
self.lf()
209+
self.gen(" settings.save()")
210+
self.lf()
211+
212+
# Helpers
213+
214+
def gen(self, code):
215+
self.code += "%s%s\n" % (" " * (4 * self.cur_indent), code)
216+
217+
def lf(self):
218+
self.code += "\n"
219+
220+
def codegen_object_fields_explicit(self, object, template, copy, prefix):
221+
for key in copy:
222+
if not key in object:
223+
continue
224+
self.codegen_object_field(object, key, template, prefix)
225+
226+
def codegen_object_fields(self, object, template, do_not_copy, prefix):
227+
for key in object.keys():
228+
if key in do_not_copy:
229+
continue
230+
self.codegen_object_field(object, key, template, prefix)
231+
232+
def codegen_object_field(self, object, key, template, prefix):
233+
value_for_key = object[key]
234+
default_value_for_key = template.get(key, None)
235+
236+
if default_value_for_key is not None and json.dumps(value_for_key) == json.dumps(default_value_for_key):
237+
#print("Skipping value equal to default: %s" % key)
238+
return
239+
else:
240+
#print("Not equal for %s" % key)
241+
#print("Template: %s" % default_value_for_key)
242+
#print("Real: %s" % value_for_key)
243+
self.gen(" %s[\"%s\"] = %s" % ( prefix, key, output_to_code(value_for_key)))

0 commit comments

Comments
 (0)