Skip to content

Commit 4164a53

Browse files
docs: Document that the write_parquet methods use mkdir in polars (#142)
1 parent f26405c commit 4164a53

6 files changed

Lines changed: 116 additions & 9 deletions

File tree

dataframely/collection/collection.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -864,9 +864,8 @@ def write_parquet(self, directory: str | Path, **kwargs: Any) -> None:
864864
members which are not provided in the current collection.
865865
866866
Args:
867-
directory: The directory the Parquet files should be written to.
868-
If the directory does not exist, it is created automatically,
869-
including all of its parents.
867+
directory: The directory where the Parquet files should be written to.
868+
The `mkdir` kwarg controls whether the directory is created if needed.
870869
kwargs: Additional keyword arguments passed to :meth:`polars.DataFrame.write_parquet`.
871870
`metadata` may only be provided if it is a dictionary.
872871

dataframely/filter_result.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ def write_parquet(self, file: str | Path | IO[bytes], **kwargs: Any) -> None:
154154
Args:
155155
file: The file path or writable file-like object to which to write the
156156
parquet file. This should be a path to a directory if writing a
157-
partitioned dataset.
157+
partitioned dataset. The `mkdir` kwarg controls whether the directory
158+
is created if needed.
158159
kwargs: Additional keyword arguments passed directly to
159160
:meth:`polars.write_parquet`. ``metadata`` may only be provided if it
160161
is a dictionary.

dataframely/schema.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,8 @@ def write_parquet(
884884
df: The data frame to write to the parquet file.
885885
file: The file path or writable file-like object to which to write the
886886
parquet file. This should be a path to a directory if writing a
887-
partitioned dataset.
887+
partitioned dataset. The `mkdir` kwarg controls whether the directory
888+
is created if needed.
888889
kwargs: Additional keyword arguments passed directly to
889890
:meth:`polars.write_parquet`. `metadata` may only be provided if it
890891
is a dictionary.

tests/collection/test_storage.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,50 @@ def test_read_invalid_parquet_metadata_collection(
498498
assert collection is None
499499

500500

501+
@pytest.mark.parametrize(
502+
"any_tmp_path",
503+
["tmp_path", pytest.param("s3_tmp_path", marks=pytest.mark.s3)],
504+
indirect=True,
505+
)
506+
def test_write_nonexistent_directory(any_tmp_path: str) -> None:
507+
# Arrange
508+
collection = MyCollection.validate(
509+
{
510+
"first": pl.LazyFrame({"a": [1, 2, 3]}),
511+
"second": pl.LazyFrame({"a": [1, 2], "b": [10, 15]}),
512+
},
513+
cast=True,
514+
)
515+
fs: AbstractFileSystem = url_to_fs(any_tmp_path)[0]
516+
target_path = fs.sep.join([any_tmp_path, "non_existent_dir"])
517+
518+
# Act
519+
collection.write_parquet(target_path, mkdir=True)
520+
521+
# Assert
522+
out = MyCollection.read_parquet(target_path)
523+
assert_frame_equal(collection.first, out.first)
524+
assert collection.second is not None
525+
assert out.second is not None
526+
assert_frame_equal(collection.second, out.second)
527+
528+
529+
def test_write_parquet_fails_without_mkdir(tmp_path: str) -> None:
530+
# Arrange
531+
collection = MyCollection.validate(
532+
{
533+
"first": pl.LazyFrame({"a": [1, 2, 3]}),
534+
"second": pl.LazyFrame({"a": [1, 2], "b": [10, 15]}),
535+
},
536+
cast=True,
537+
)
538+
p = f"{tmp_path}/non_existent_dir"
539+
540+
# Act / Assert
541+
with pytest.raises(FileNotFoundError):
542+
collection.write_parquet(p)
543+
544+
501545
# ---------------------------- DELTA LAKE SPECIFICS ---------------------------------- #
502546

503547

tests/failure_info/test_storage.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,10 @@ def test_invalid_schema_deserialization(
133133
["tmp_path", pytest.param("s3_tmp_path", marks=pytest.mark.s3)],
134134
indirect=True,
135135
)
136-
def test_write_parquet_custom_metadata(any_tmp_path: str) -> None:
136+
@pytest.mark.parametrize("check_non_existent_directory", [True, False])
137+
def test_write_parquet_custom_metadata(
138+
any_tmp_path: str, check_non_existent_directory: bool
139+
) -> None:
137140
# Arrange
138141
df = pl.DataFrame(
139142
{
@@ -144,10 +147,35 @@ def test_write_parquet_custom_metadata(any_tmp_path: str) -> None:
144147
_, failure = MySchema.filter(df)
145148
assert failure._df.height == 4
146149

147-
# Act
148150
fs: AbstractFileSystem = url_to_fs(any_tmp_path)[0]
149-
p = fs.sep.join([any_tmp_path, "failure.parquet"])
150-
failure.write_parquet(p, metadata={"custom": "test"})
151+
path_components = (
152+
[any_tmp_path]
153+
+ (["non_existent_dir"] if check_non_existent_directory else [])
154+
+ ["failure.parquet"]
155+
)
156+
p = fs.sep.join(path_components)
157+
158+
# Act
159+
if check_non_existent_directory:
160+
failure.write_parquet(p, metadata={"custom": "test"}, mkdir=True)
161+
else:
162+
failure.write_parquet(p, metadata={"custom": "test"})
151163

152164
# Assert
153165
assert pl.read_parquet_metadata(p)["custom"] == "test"
166+
167+
168+
def test_write_parquet_fails_without_mkdir(tmp_path: str) -> None:
169+
# Arrange
170+
df = pl.DataFrame(
171+
{
172+
"a": [4, 5, 6, 6, 7, 8],
173+
"b": [1, 2, 3, 4, 5, 6],
174+
}
175+
)
176+
_, failure = MySchema.filter(df)
177+
p = f"{tmp_path}/non_existent_dir/failure.parquet"
178+
179+
# Act / Assert
180+
with pytest.raises(FileNotFoundError):
181+
failure.write_parquet(p)

tests/schema/test_read_write_parquet.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import polars as pl
77
import pytest
8+
from fsspec import url_to_fs
89

910
import dataframely as dy
1011
from dataframely._storage.parquet import SCHEMA_METADATA_KEY
@@ -25,3 +26,36 @@ def test_read_invalid_parquet_metadata_schema(
2526

2627
# Assert
2728
assert schema is None
29+
30+
31+
class MySchema(dy.Schema):
32+
a = dy.Int64()
33+
34+
35+
@pytest.mark.parametrize(
36+
"any_tmp_path",
37+
["tmp_path", pytest.param("s3_tmp_path", marks=pytest.mark.s3)],
38+
indirect=True,
39+
)
40+
def test_write_parquet_non_existing_directory(any_tmp_path: str) -> None:
41+
# Arrange
42+
df = MySchema.create_empty()
43+
fs = url_to_fs(any_tmp_path)[0]
44+
file = fs.sep.join([any_tmp_path, "non_existing_dir", "df.parquet"])
45+
46+
# Act
47+
MySchema.write_parquet(df, file=file, mkdir=True)
48+
49+
# Assert
50+
result = MySchema.read_parquet(file)
51+
assert result.shape == (0, 1)
52+
53+
54+
def test_write_parquet_fails_without_mkdir(tmp_path: str) -> None:
55+
# Arrange
56+
df = MySchema.create_empty()
57+
p = f"{tmp_path}/non_existent_dir/df.parquet"
58+
59+
# Act / Assert
60+
with pytest.raises(FileNotFoundError):
61+
MySchema.write_parquet(df, file=p)

0 commit comments

Comments
 (0)