3131)
3232
3333from loguru import logger
34+ import pydantic
3435from pydantic import BaseModel
3536import requests
3637
3738from datacustomcode .cmd import cmd_output
38- from datacustomcode .scan import scan_file
39- from datacustomcode .version import get_version
4039
4140if TYPE_CHECKING :
4241 from datacustomcode .credentials import Credentials
@@ -79,8 +78,10 @@ def _make_api_call(
7978 logger .debug (f"Request params: { kwargs } " )
8079
8180 response = requests .request (method = method , url = url , headers = headers , ** kwargs )
82- response .raise_for_status ()
8381 json_response = response .json ()
82+ if response .status_code >= 400 :
83+ logger .debug (f"Error Response: { json_response } " )
84+ response .raise_for_status ()
8485 assert isinstance (
8586 json_response , dict
8687 ), f"Unexpected response type: { type (json_response )} "
@@ -225,89 +226,62 @@ def wait_for_deployment(
225226 callback (status )
226227 if status == "Deployed" :
227228 logger .debug (
228- "Deployment completed, Elapsed time: {time.time() - start_time}"
229+ f "Deployment completed. \n Elapsed time: { time .time () - start_time } "
229230 )
230231 break
231232 time .sleep (1 )
232233
233234
234235DATA_TRANSFORM_REQUEST_TEMPLATE : dict [str , Any ] = {
235- "metadata" : {
236- "dbt_schema_version" : "https://schemas.getdbt.com/dbt/manifest/v8.json" ,
237- "dbt_version" : "1.4.6" ,
238- "generated_at" : "2023-04-25T18:54:11.375589Z" ,
239- "invocation_id" : "d6c68c69-533a-4d54-861e-1493d6cd8092" ,
240- "env" : {},
241- "project_id" : "jaffle_shop" ,
242- "user_id" : "1ca8403c-a1a5-43af-8b88-9265e948b9d2" ,
243- "send_anonymous_usage_stats" : True ,
244- "adapter_type" : "spark" ,
245- },
246- "nodes" : {
247- "model.dcexample.dim_listings_w_hosts" : {
248- "name" : "dim_listings_w_hosts" ,
249- "resource_type" : "model" ,
250- "relation_name" : "{OUTPUT_DLO}" ,
251- "config" : {"materialized" : "table" },
252- "compiled_code" : "" ,
253- "depends_on" : {"nodes" : []},
254- }
255- },
256- "sources" : {
257- "source.dcexample.listings" : {
258- "name" : "listings" ,
259- "resource_type" : "source" ,
260- "relation_name" : "{INPUT_DLO}" ,
261- "identifier" : "{INPUT_DLO}" ,
262- }
263- },
236+ "nodes" : {},
237+ "sources" : {},
264238 "macros" : {
265- "macro.dcexample.byoc" : {
266- "name" : "byoc_example" ,
267- "resource_type" : "macro" ,
268- "path" : "" ,
269- "original_file_path" : "" ,
270- "unique_id" : "unique id" ,
271- "macro_sql" : "" ,
272- "supported_languages" : None ,
239+ "macro.byoc" : {
273240 "arguments" : [{"name" : "{SCRIPT_NAME}" , "type" : "BYOC_SCRIPT" }],
274241 }
275242 },
276243}
277244
278245
279246class DataTransformConfig (BaseModel ):
280- input : Union [str , list [str ]]
281- output : Union [str , list [str ]]
247+ sdkVersion : str
248+ entryPoint : str
249+ dataspace : str
250+ permissions : Permissions
282251
283252
284- DATA_TRANSFORM_CONFIG_TEMPLATE : dict [str , Any ] = {
285- "entryPoint" : "entrypoint.py" ,
286- "dataspace" : "default" ,
287- "permissions" : {"read" : {"dlo" : "" }, "write" : {"dlo" : "" }},
288- "sdkVersion" : get_version (),
289- }
253+ class Permissions (BaseModel ):
254+ read : Union [DloPermission ]
255+ write : Union [DloPermission ]
290256
291257
292- def get_data_transform_config (directory : str ) -> DataTransformConfig :
293- """Get the data transform config from the entrypoint.py file."""
294- entrypoint_file = os .path .join (directory , "entrypoint.py" )
295- data_access_layer_calls = scan_file (entrypoint_file )
296- input_ = data_access_layer_calls .input_str
297- output = data_access_layer_calls .output_str
298- return DataTransformConfig (input = input_ , output = output )
258+ class DloPermission (BaseModel ):
259+ dlo : list [str ]
299260
300261
301- def create_data_transform_config (directory : str ) -> None :
302- """Create a data transform config.json file in the directory."""
303- data_transform_config = get_data_transform_config (directory )
304- request_hydrated = DATA_TRANSFORM_CONFIG_TEMPLATE .copy ()
305- request_hydrated ["permissions" ]["read" ]["dlo" ] = data_transform_config .input
306- request_hydrated ["permissions" ]["write" ]["dlo" ] = data_transform_config .output
307- logger .debug (f"Creating data transform config in { directory } " )
308- json .dump (
309- request_hydrated , open (os .path .join (directory , "config.json" ), "w" ), indent = 4
310- )
262+ def get_data_transform_config (directory : str ) -> DataTransformConfig :
263+ """Get the data transform config from the config.json file."""
264+ config_path = os .path .join (directory , "config.json" )
265+ try :
266+ with open (config_path , "r" ) as f :
267+ config = json .loads (f .read ())
268+ return DataTransformConfig (** config )
269+ except FileNotFoundError as err :
270+ raise FileNotFoundError (f"config.json not found at { config_path } " ) from err
271+ except json .JSONDecodeError as err :
272+ raise ValueError (f"config.json at { config_path } is not valid JSON" ) from err
273+ except pydantic .ValidationError as err :
274+ missing_fields = [str (err ["loc" ][0 ]) for err in err .errors ()]
275+ raise ValueError (
276+ f"config.json at { config_path } is missing required "
277+ f"fields: { ', ' .join (missing_fields )} "
278+ ) from err
279+
280+
281+ def verify_data_transform_config (directory : str ) -> None :
282+ """Verify the data transform config.json contents."""
283+ get_data_transform_config (directory )
284+ logger .debug (f"Verified data transform config in { directory } " )
311285
312286
313287def create_data_transform (
@@ -319,28 +293,31 @@ def create_data_transform(
319293 script_name = metadata .name
320294 data_transform_config = get_data_transform_config (directory )
321295 request_hydrated = DATA_TRANSFORM_REQUEST_TEMPLATE .copy ()
322- request_hydrated ["nodes" ]["model.dcexample.dim_listings_w_hosts" ][
323- "relation_name"
324- ] = data_transform_config .input
325- request_hydrated ["sources" ]["source.dcexample.listings" ][
326- "relation_name"
327- ] = data_transform_config .output
328- request_hydrated ["sources" ]["source.dcexample.listings" ][
329- "identifier"
330- ] = data_transform_config .output
331- request_hydrated ["macros" ]["macro.dcexample.byoc" ]["arguments" ][0 ][
332- "name"
333- ] = script_name
296+
297+ # Add nodes for each write DLO
298+ for i , dlo in enumerate (data_transform_config .permissions .write .dlo , 1 ):
299+ request_hydrated ["nodes" ][f"node{ i } " ] = {
300+ "relation_name" : dlo ,
301+ "config" : {"materialized" : "table" },
302+ "compiled_code" : "" ,
303+ }
304+
305+ # Add sources for each read DLO
306+ for i , dlo in enumerate (data_transform_config .permissions .read .dlo , 1 ):
307+ request_hydrated ["sources" ][f"source{ i } " ] = {"relation_name" : dlo }
308+
309+ request_hydrated ["macros" ]["macro.byoc" ]["arguments" ][0 ]["name" ] = script_name
334310
335311 body = {
336312 "definition" : {
337- "type" : "DBT " ,
313+ "type" : "DCSQL " ,
338314 "manifest" : request_hydrated ,
339315 "version" : "56.0" ,
340316 },
341317 "label" : f"{ metadata .name } " ,
342318 "name" : f"{ metadata .name } " ,
343319 "type" : "BATCH" ,
320+ "dataSpaceName" : data_transform_config .dataspace ,
344321 }
345322
346323 url = _join_strip_url (access_token .instance_url , DATA_TRANSFORMS_PATH )
@@ -359,7 +336,7 @@ def deploy_full(
359336
360337 # prepare payload
361338 prepare_dependency_archive (directory )
362- create_data_transform_config (directory )
339+ verify_data_transform_config (directory )
363340
364341 # create deployment and upload payload
365342 deployment = create_deployment (access_token , metadata )
0 commit comments