Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/datacustomcode/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,25 @@ def deploy(

@cli.command()
@click.argument("directory", default=".")
def init(directory: str):
from datacustomcode.scan import dc_config_json_from_file
from datacustomcode.template import copy_template
@click.option("--type", default="script", type=click.Choice(["script", "function"]))
def init(directory: str, type: str):
from datacustomcode.scan import dc_config_json_from_file, update_config
from datacustomcode.template import copy_function_template, copy_script_template

click.echo("Copying template to " + click.style(directory, fg="blue", bold=True))
copy_template(directory)
if type == "script":
copy_script_template(directory)
elif type == "function":
copy_function_template(directory)
entrypoint_path = os.path.join(directory, "payload", "entrypoint.py")
config_location = os.path.join(os.path.dirname(entrypoint_path), "config.json")
config_json = dc_config_json_from_file(entrypoint_path)
config_json = dc_config_json_from_file(entrypoint_path, type)
with open(config_location, "w") as f:
json.dump(config_json, f, indent=2)

updated_config_json = update_config(entrypoint_path)
with open(config_location, "w") as f:
json.dump(updated_config_json, f, indent=2)
click.echo(
"Start developing by updating the code in "
+ click.style(entrypoint_path, fg="blue", bold=True)
Expand All @@ -176,15 +183,15 @@ def init(directory: str):
"--no-requirements", is_flag=True, help="Skip generating requirements.txt file"
)
def scan(filename: str, config: str, dry_run: bool, no_requirements: bool):
from datacustomcode.scan import dc_config_json_from_file, write_requirements_file
from datacustomcode.scan import update_config, write_requirements_file

config_location = config or os.path.join(os.path.dirname(filename), "config.json")
click.echo(
"Dumping scan results to config file: "
+ click.style(config_location, fg="blue", bold=True)
)
click.echo("Scanning " + click.style(filename, fg="blue", bold=True) + "...")
config_json = dc_config_json_from_file(filename)
config_json = update_config(filename)

click.secho(json.dumps(config_json, indent=2), fg="yellow")
if not dry_run:
Expand Down
91 changes: 53 additions & 38 deletions src/datacustomcode/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,20 @@

DATA_TRANSFORM_CONFIG_TEMPLATE = {
"sdkVersion": get_version(),
"type": "script",
"entryPoint": "",
"dataspace": "",
"dataspace": "default",
"permissions": {
"read": {},
"write": {},
},
}

FUNCTION_CONFIG_TEMPLATE = {
"sdkVersion": get_version(),
"type": "function",
"entryPoint": "",
}
STANDARD_LIBS = set(sys.stdlib_module_names)


Expand Down Expand Up @@ -230,57 +236,66 @@ def scan_file(file_path: str) -> DataAccessLayerCalls:
return visitor.found()


def dc_config_json_from_file(file_path: str) -> dict[str, Any]:
def dc_config_json_from_file(file_path: str, type: str) -> dict[str, Any]:
"""Create a Data Cloud Custom Code config JSON from a script."""
output = scan_file(file_path)
config = DATA_TRANSFORM_CONFIG_TEMPLATE.copy()
config: dict[str, Any]
if type == "script":
config = DATA_TRANSFORM_CONFIG_TEMPLATE.copy()
elif type == "function":
config = FUNCTION_CONFIG_TEMPLATE.copy()
config["entryPoint"] = file_path.rpartition("/")[-1]
return config


def update_config(file_path: str) -> dict[str, Any]:
file_dir = os.path.dirname(file_path)
config_json_path = os.path.join(file_dir, "config.json")

existing_config: dict[str, Any]
if os.path.exists(config_json_path) and os.path.isfile(config_json_path):
try:
with open(config_json_path, "r") as f:
existing_config = json.load(f)

if "dataspace" in existing_config:
dataspace_value = existing_config["dataspace"]
if not dataspace_value or (
isinstance(dataspace_value, str) and dataspace_value.strip() == ""
):
logger.warning(
f"dataspace in {config_json_path} is empty or None. "
f"Updating config file to use dataspace 'default'. "
)
config["dataspace"] = "default"
else:
config["dataspace"] = dataspace_value
else:
raise ValueError(
f"dataspace must be defined in {config_json_path}. "
f"Please add a 'dataspace' field to the config.json file. "
)
except json.JSONDecodeError as e:
raise ValueError(
f"Failed to parse JSON from {config_json_path}: {e}"
) from e
except OSError as e:
raise OSError(f"Failed to read config file {config_json_path}: {e}") from e
else:
config["dataspace"] = "default"

read: dict[str, list[str]] = {}
if output.read_dlo:
read["dlo"] = list(output.read_dlo)
else:
read["dmo"] = list(output.read_dmo)
write: dict[str, list[str]] = {}
if output.write_to_dlo:
write["dlo"] = list(output.write_to_dlo)
raise ValueError(f"config.json not found at {config_json_path}")
if existing_config["type"] == "script":
existing_config["dataspace"] = get_dataspace(existing_config)
output = scan_file(file_path)
read: dict[str, list[str]] = {}
if output.read_dlo:
read["dlo"] = list(output.read_dlo)
else:
read["dmo"] = list(output.read_dmo)
write: dict[str, list[str]] = {}
if output.write_to_dlo:
write["dlo"] = list(output.write_to_dlo)
else:
write["dmo"] = list(output.write_to_dmo)

existing_config["permissions"] = {"read": read, "write": write}
return existing_config


def get_dataspace(existing_config: dict[str, str]) -> str:
if "dataspace" in existing_config:
dataspace_value = existing_config["dataspace"]
if not dataspace_value or (
isinstance(dataspace_value, str) and dataspace_value.strip() == ""
):
logger.warning(
"dataspace is empty or None. "
"Updating config file to use dataspace 'default'. "
)
return "default"
else:
return dataspace_value
else:
write["dmo"] = list(output.write_to_dmo)

config["permissions"] = {"read": read, "write": write}

return config
raise ValueError(
"dataspace must be defined. "
"Please add a 'dataspace' field to the config.json file. "
)
24 changes: 20 additions & 4 deletions src/datacustomcode/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,31 @@

from loguru import logger

template_dir = os.path.join(os.path.dirname(__file__), "templates")
script_template_dir = os.path.join(os.path.dirname(__file__), "templates", "script")
function_template_dir = os.path.join(os.path.dirname(__file__), "templates", "function")


def copy_template(target_dir: str) -> None:
def copy_script_template(target_dir: str) -> None:
"""Copy the template to the target directory."""
os.makedirs(target_dir, exist_ok=True)

for item in os.listdir(template_dir):
source = os.path.join(template_dir, item)
for item in os.listdir(script_template_dir):
source = os.path.join(script_template_dir, item)
destination = os.path.join(target_dir, item)

if os.path.isdir(source):
logger.debug(f"Copying directory {source} to {destination}...")
shutil.copytree(source, destination, dirs_exist_ok=True)
else:
logger.debug(f"Copying file {source} to {destination}...")
shutil.copy2(source, destination)


def copy_function_template(target_dir: str) -> None:
os.makedirs(target_dir, exist_ok=True)

for item in os.listdir(function_template_dir):
source = os.path.join(function_template_dir, item)
destination = os.path.join(target_dir, item)

if os.path.isdir(source):
Expand Down
1 change: 1 addition & 0 deletions src/datacustomcode/templates/function/payload/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
120 changes: 120 additions & 0 deletions src/datacustomcode/templates/function/payload/entrypoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import logging
from typing import List
from uuid import uuid4

logger = logging.getLogger(__name__)


def chunk_text(text: str, chunk_size: int = 1000) -> List[str]:
"""
Split text into chunks of approximately chunk_size characters.
Tries to split at sentence boundaries when possible.
"""
if not text:
return []

chunks = []
current_chunk = ""

# Split text into sentences (simple split by period)
sentences = text.split(". ")

for sentence in sentences:
if len(current_chunk) + len(sentence) <= chunk_size:
current_chunk += sentence + ". "
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence + ". "

if current_chunk:
chunks.append(current_chunk.strip())

return chunks


def dc_function(request: dict) -> dict:
logger.info("Inside DC Function")
logger.info(request)

items = request["input"]
output_chunks = []
current_seq_no = 1 # Start sequence number from 1

for item in items:
# Item is DocElement as dict
logger.info("Processing item: ")
Comment thread
joroscoSF marked this conversation as resolved.
Outdated
logger.info(item)

text = item.get("text", "")
metadata = item.get("metadata", {})

# Create chunks from the text
text_chunks = chunk_text(text, chunk_size=100) # Using a larger chunk size

# Create chunk dictionaries for each text chunk
for chunk_content in text_chunks:
chunk_dict = {
"text": chunk_content,
"metadata": metadata,
"seq_no": current_seq_no,
"chunk_type": "text",
"chunk_id": str(uuid4()),
"tag_metadata": {},
"citations": {},
"source_record": item,
}
output_chunks.append(chunk_dict)
current_seq_no += 1 # Increment sequence number for next chunk

logger.info("Completed chunking")
response = {
"output": output_chunks,
"status": {"status_type": "success", "status_message": "Chunking completed"},
}
logger.info(response)
return response


# Test the function
if __name__ == "__main__":
# Configure logging
logging.basicConfig(level=logging.INFO)

# Create test data with two DocElements
test_request = {
"input": [
{
"text": (
"""This is the first sentence of the first document, which is
intentionally made longer to test chunking. """
"""Here is the second sentence of the first document, which is also
quite long and should ensure that the chunking function splits
this text into two chunks when the chunk size is set to 100."""
),
"metadata": {"source": "test1", "type": "document"},
},
{
"text": (
"""This is the first sentence of the second document, and it is
also extended to be longer than usual for testing purposes. """
"""The second sentence of the second document is similarly lengthy,
so that the chunking function will again create two chunks for
this document."""
),
"metadata": {"source": "test2", "type": "document"},
},
]
}

# Run the function
result = dc_function(test_request)

# Print the results in a more readable format
print("\nChunking Results:")
print("----------------")
for chunk in result["output"]:
print(f"\nChunk #{chunk['seq_no']}:")
print(f"Text: {chunk['text'][:100]}...") # Print first 100 chars of each chunk
print(f"Source: {chunk['metadata']['source']}")
print(f"Chunk ID: {chunk['chunk_id']}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"name": "Existing Dockerfile",
"build": {
"context": "..",
"dockerfile": "../Dockerfile"
},
"features": {
"ghcr.io/devcontainers/features/git:1": {},
}
}
18 changes: 18 additions & 0 deletions src/datacustomcode/templates/script/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM public.ecr.aws/emr-on-eks/spark/emr-7.3.0:latest

USER root

# install from dev requirements.txt
COPY requirements-dev.txt ./requirements-dev.txt
RUN pip3.11 install --no-cache-dir -r requirements-dev.txt

# Install from requirements.txt:
COPY requirements.txt ./requirements.txt
RUN pip3.11 install --no-cache-dir -r requirements.txt

# Create workspace directory
RUN mkdir /workspace

# Set user and working directory
USER hadoop:hadoop
WORKDIR /workspace
11 changes: 11 additions & 0 deletions src/datacustomcode/templates/script/Dockerfile.dependencies
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM public.ecr.aws/emr-on-eks/spark/emr-7.3.0:latest

USER root

RUN pip3.11 install venv-pack

# Create workspace directory
RUN mkdir /workspace
WORKDIR /workspace

CMD ["./build_native_dependencies.sh"]
Empty file.
Loading