fix: compress uncompressed read files before upload#84
Conversation
- Check if each input file is gzipped; if not, compress it before upload - Adapts finalize step to handle both compressed and uncompressed user inputs - Resolves VIR-2348: sample creation failures when input FASTQ is not compressed - Adds `proc` parameter to finalize for pigz parallelization during compression
There was a problem hiding this comment.
Code Review
This pull request updates the finalize step in workflow.py to handle uncompressed read files by compressing them before upload. It introduces the proc parameter and uses is_gzipped to check file status. Feedback suggests parallelizing the compression and upload processes using asyncio.gather to improve efficiency, along with adding a check to prevent renaming a file to itself.
| for i, path in enumerate(read_paths): | ||
| new_path = await asyncio.to_thread(path.rename, f"reads_{i + 1}.fq.gz") | ||
| await new_sample.upload(new_path) | ||
| target = path.with_name(f"reads_{i + 1}.fq.gz") | ||
| if is_gzipped(path): | ||
| await asyncio.to_thread(path.rename, target) | ||
| else: | ||
| await asyncio.to_thread(compress_file, path, target, proc) | ||
| await new_sample.upload(target) |
There was a problem hiding this comment.
The current implementation processes each read file (compression and upload) sequentially. For paired-end samples, this means the second file's compression only starts after the first file has finished its upload. Since compression is CPU-bound and uploading is network-bound, these operations can be overlapped to improve performance. Using asyncio.gather would allow the workflow to process both files in parallel, which is more efficient and safe given that Virtool samples typically have at most two read files. Additionally, adding a check to ensure path != target before renaming prevents potential issues on systems where renaming a file to itself might cause errors or unnecessary I/O.
| for i, path in enumerate(read_paths): | |
| new_path = await asyncio.to_thread(path.rename, f"reads_{i + 1}.fq.gz") | |
| await new_sample.upload(new_path) | |
| target = path.with_name(f"reads_{i + 1}.fq.gz") | |
| if is_gzipped(path): | |
| await asyncio.to_thread(path.rename, target) | |
| else: | |
| await asyncio.to_thread(compress_file, path, target, proc) | |
| await new_sample.upload(target) | |
| async def process_path(i, path): | |
| target = path.with_name(f"reads_{i + 1}.fq.gz") | |
| if is_gzipped(path): | |
| if path != target: | |
| await asyncio.to_thread(path.rename, target) | |
| else: | |
| await asyncio.to_thread(compress_file, path, target, proc) | |
| await new_sample.upload(target) | |
| await asyncio.gather( | |
| *(process_path(i, path) for i, path in enumerate(read_paths)) | |
| ) |
There was a problem hiding this comment.
- re gather: Technically there is a core of a good idea here given that these steps are rate limited by different resources. As written though it will just try to spin up to compression instances using all cores for both. If we want the performance benefit we can introduce a pipelined approach that actaully separates the compression and upload steps, but imo the performance is probably not worth the complexity here. esp given that most samples are only a single file
- re path!= target guard. In linux I think think this just prevents a no-op. I'm okay with this change, just not sure it really does anything
procparameter to finalize for pigz parallelization during compression