1515from __future__ import annotations
1616
1717from html import unescape
18+ import json
1819import os
1920import shutil
2021import tarfile
3031)
3132
3233from loguru import logger
34+ import pydantic
3335from pydantic import BaseModel
3436import requests
3537
3638from datacustomcode .cmd import cmd_output
37- from datacustomcode .scan import scan_file
3839
3940if TYPE_CHECKING :
4041 from datacustomcode .credentials import Credentials
@@ -77,8 +78,10 @@ def _make_api_call(
7778 logger .debug (f"Request params: { kwargs } " )
7879
7980 response = requests .request (method = method , url = url , headers = headers , ** kwargs )
80- response .raise_for_status ()
8181 json_response = response .json ()
82+ if response .status_code >= 400 :
83+ logger .debug (f"Error Response: { json_response } " )
84+ response .raise_for_status ()
8285 assert isinstance (
8386 json_response , dict
8487 ), f"Unexpected response type: { type (json_response )} "
@@ -223,77 +226,65 @@ def wait_for_deployment(
223226 callback (status )
224227 if status == "Deployed" :
225228 logger .debug (
226- "Deployment completed, Elapsed time: {time.time() - start_time}"
229+ f "Deployment completed. \n Elapsed time: { time .time () - start_time } "
227230 )
228231 break
229232 time .sleep (1 )
230233
231234
232235DATA_TRANSFORM_REQUEST_TEMPLATE : dict [str , Any ] = {
233- "metadata" : {
234- "dbt_schema_version" : "https://schemas.getdbt.com/dbt/manifest/v8.json" ,
235- "dbt_version" : "1.4.6" ,
236- "generated_at" : "2023-04-25T18:54:11.375589Z" ,
237- "invocation_id" : "d6c68c69-533a-4d54-861e-1493d6cd8092" ,
238- "env" : {},
239- "project_id" : "jaffle_shop" ,
240- "user_id" : "1ca8403c-a1a5-43af-8b88-9265e948b9d2" ,
241- "send_anonymous_usage_stats" : True ,
242- "adapter_type" : "spark" ,
243- },
244- "nodes" : {
245- "model.dcexample.dim_listings_w_hosts" : {
246- "name" : "dim_listings_w_hosts" ,
247- "resource_type" : "model" ,
248- "relation_name" : "{OUTPUT_DLO}" ,
249- "config" : {"materialized" : "table" },
250- "compiled_code" : "" ,
251- "depends_on" : {"nodes" : []},
252- }
253- },
254- "sources" : {
255- "source.dcexample.listings" : {
256- "name" : "listings" ,
257- "resource_type" : "source" ,
258- "relation_name" : "{INPUT_DLO}" ,
259- "identifier" : "{INPUT_DLO}" ,
260- }
261- },
236+ "nodes" : {},
237+ "sources" : {},
262238 "macros" : {
263- "macro.dcexample.byoc" : {
264- "name" : "byoc_example" ,
265- "resource_type" : "macro" ,
266- "path" : "" ,
267- "original_file_path" : "" ,
268- "unique_id" : "unique id" ,
269- "macro_sql" : "" ,
270- "supported_languages" : None ,
239+ "macro.byoc" : {
271240 "arguments" : [{"name" : "{SCRIPT_NAME}" , "type" : "BYOC_SCRIPT" }],
272241 }
273242 },
274243}
275244
276245
277246class DataTransformConfig (BaseModel ):
278- input : Union [str , list [str ]]
279- output : Union [str , list [str ]]
247+ sdkVersion : str
248+ entryPoint : str
249+ dataspace : str
250+ permissions : Permissions
280251
281252
282- def get_data_transform_config (directory : str ) -> DataTransformConfig :
283- """Get the data transform config from the entrypoint.py file."""
284- entrypoint_file = os .path .join (directory , "entrypoint.py" )
285- data_access_layer_calls = scan_file (entrypoint_file )
286- input_ = data_access_layer_calls .input_str
287- output = data_access_layer_calls .output_str
288- return DataTransformConfig (input = input_ , output = output )
253+ class Permissions (BaseModel ):
254+ read : Union [DloPermission ]
255+ write : Union [DloPermission ]
289256
290257
291- def verify_data_transform_config (directory : str ) -> None :
292- """Verify that the data transform config.json file exists in the directory."""
258+ class DloPermission (BaseModel ):
259+ dlo : list [str ]
260+
261+
262+ def get_data_transform_config (directory : str ) -> DataTransformConfig :
263+ """Get the data transform config from the config.json file."""
293264 config_path = os .path .join (directory , "config.json" )
294- if not os .path .exists (config_path ):
295- raise FileNotFoundError (f"config.json not found in { directory } " )
265+ try :
266+ with open (config_path , "r" ) as f :
267+ config = json .loads (f .read ())
268+ return DataTransformConfig (** config )
269+ except FileNotFoundError as err :
270+ raise FileNotFoundError (
271+ f"config.json not found in { config_path } "
272+ ) from err
273+ except json .JSONDecodeError as err :
274+ raise ValueError (
275+ f"config.json in { config_path } is not valid JSON"
276+ ) from err
277+ except pydantic .ValidationError as err :
278+ missing_fields = [str (err ["loc" ][0 ]) for err in err .errors ()]
279+ raise ValueError (
280+ f"config.json in { config_path } is missing required "
281+ f"fields: { ', ' .join (missing_fields )} "
282+ ) from err
296283
284+
285+ def verify_data_transform_config (directory : str ) -> None :
286+ """Verify the data transform config.json contents."""
287+ get_data_transform_config (directory )
297288 logger .debug (f"Verified data transform config in { directory } " )
298289
299290
@@ -306,28 +297,31 @@ def create_data_transform(
306297 script_name = metadata .name
307298 data_transform_config = get_data_transform_config (directory )
308299 request_hydrated = DATA_TRANSFORM_REQUEST_TEMPLATE .copy ()
309- request_hydrated ["nodes" ]["model.dcexample.dim_listings_w_hosts" ][
310- "relation_name"
311- ] = data_transform_config .input
312- request_hydrated ["sources" ]["source.dcexample.listings" ][
313- "relation_name"
314- ] = data_transform_config .output
315- request_hydrated ["sources" ]["source.dcexample.listings" ][
316- "identifier"
317- ] = data_transform_config .output
318- request_hydrated ["macros" ]["macro.dcexample.byoc" ]["arguments" ][0 ][
319- "name"
320- ] = script_name
300+
301+ # Add nodes for each write DLO
302+ for i , dlo in enumerate (data_transform_config .permissions .write .dlo , 1 ):
303+ request_hydrated ["nodes" ][f"node{ i } " ] = {
304+ "relation_name" : dlo ,
305+ "config" : {"materialized" : "table" },
306+ "compiled_code" : "" ,
307+ }
308+
309+ # Add sources for each read DLO
310+ for i , dlo in enumerate (data_transform_config .permissions .read .dlo , 1 ):
311+ request_hydrated ["sources" ][f"source{ i } " ] = {"relation_name" : dlo }
312+
313+ request_hydrated ["macros" ]["macro.byoc" ]["arguments" ][0 ]["name" ] = script_name
321314
322315 body = {
323316 "definition" : {
324- "type" : "DBT " ,
317+ "type" : "DCSQL " ,
325318 "manifest" : request_hydrated ,
326319 "version" : "56.0" ,
327320 },
328321 "label" : f"{ metadata .name } " ,
329322 "name" : f"{ metadata .name } " ,
330323 "type" : "BATCH" ,
324+ "dataSpaceName" : data_transform_config .dataspace ,
331325 }
332326
333327 url = _join_strip_url (access_token .instance_url , DATA_TRANSFORMS_PATH )
0 commit comments