The chunks.py file provides content management related to chunk upload (such as store_content_chunk, assemble_chunks), but the chunk content is temporarily stored in Python's memory (contextvars.ContextVar("jupyterlab_content_chunks", default={})) and is written to S3 only after all chunks are uploaded and spliced.
If the chunks are large or not cleaned up in time when uploading large files, the memory will expand rapidly.
It is possible to write each chunk directly to S3 (via s3fs) instead of first storing it in memory or on disk.
from s3contents import S3ContentsManager
class S3DirectChunkContentsManager(S3ContentsManager):
"""
Write each chunk directly to S3 (streaming chunks via s3fs), without using disk or memory caching.
"""
def _save_large_file(self, chunk, model, path, format):
"""
Override the logic for large file chunked uploads.
"""
if "type" not in model:
self.do_error("No file type provided", 400)
if model["type"] != "file":
self.do_error(
'File type "{}" is not supported for large file transfer'.format(
model["type"]
),
400,
)
if format not in {"text", "base64"}:
self.do_error(
"Must specify format of file contents as 'text' or 'base64'",
400,
)
prune_stale_chunks()
self.log.debug(
"S3contents.GenericManager.save (chunk %s) %s: '%s'",
chunk,
model,
path,
)
s3fs = self.fs.fs # self.fs.fs should be an instance of s3fs.S3FileSystem
s3_path = self.fs.path(path) # Get bucket/key format
try:
if chunk == 1:
self.run_pre_save_hook(model=model, path=path)
# New upload, clean up old session and start a new session
store_content_chunk(path, model["content"], s3fs, s3_path, is_first=True)
else:
store_content_chunk(path, model["content"], s3fs, s3_path, is_first=False)
if chunk == -1:
# Last chunk, close session and commit
close_upload_session(path)
except Exception as e:
# Optional: Abort and clean up on error
abort_upload_session(path)
self.log.error(
"S3contents.GenericManager._save_large_file: error while saving file: %s %s",
path,
e,
exc_info=True,
)
self.do_error(f"Unexpected error while saving file: {path} {e}")
return self.get(path, content=False)
and s3fs_chunks.py :
import base64
import threading
# Used to manage the S3File object for each uploaded file
# Note: If using multi-process/distributed deployment, replace with distributed KV
_upload_sessions = {}
_upload_sessions_lock = threading.Lock()
def store_content_chunk(path: str, chunk_b64: str, s3fs, s3_path: str, is_first: bool = False):
"""
Write base64 chunk directly to the S3File object.
- path: Logical file path (used for session management)
- chunk_b64: Base64 encoded content
- s3fs: s3fs.S3FileSystem object
- s3_path: Target path on S3 (e.g., bucket/key)
- is_first: Whether it is the first chunk
"""
chunk_bytes = base64.b64decode(chunk_b64.encode("ascii"), validate=True)
with _upload_sessions_lock:
if is_first:
# Create a new upload session
if path in _upload_sessions:
try:
# Discard unfinished multipart upload
_upload_sessions[path].discard()
except Exception:
pass
del _upload_sessions[path]
f = s3fs.open(s3_path, "wb")
_upload_sessions[path] = f
else:
if path not in _upload_sessions:
raise RuntimeError(f"No upload session for path: {path}")
f = _upload_sessions[path]
f.write(chunk_bytes)
def close_upload_session(path: str):
"""Close and commit the upload at the last chunk"""
with _upload_sessions_lock:
if path in _upload_sessions:
_upload_sessions[path].close()
del _upload_sessions[path]
def abort_upload_session(path: str):
"""Abort and clean up in case of an error"""
with _upload_sessions_lock:
if path in _upload_sessions:
try:
_upload_sessions[path].close()
except Exception:
pass
del _upload_sessions[path]
def prune_stale_chunks(timeout_sec=3600):
"""
Periodically clean up long-unfinished upload sessions (to prevent memory leaks/resource waste).
Simplified implementation here (can be extended to record the active time of each session and close based on timeout)
"""
# This is just a demonstration, actual implementation can use timestamps
pass
The chunks.py file provides content management related to chunk upload (such as store_content_chunk, assemble_chunks), but the chunk content is temporarily stored in Python's memory (contextvars.ContextVar("jupyterlab_content_chunks", default={})) and is written to S3 only after all chunks are uploaded and spliced.
If the chunks are large or not cleaned up in time when uploading large files, the memory will expand rapidly.
It is possible to write each chunk directly to S3 (via s3fs) instead of first storing it in memory or on disk.
and s3fs_chunks.py :