Skip to content
This repository was archived by the owner on Feb 2, 2024. It is now read-only.

Commit 515f7b5

Browse files
PokhodenkoSAshssf
authored andcommitted
[WIP] Use PyArrow for reading CSV (#270)
* Add implementation via PyArrow * Add tests for implementation pandas read_csv via PyArrow * Replace _sanitize_varname() with to_varname(). This reverts commit 0a27dc0ad43c0eef749c59a42aa6f4126bd3247b. * Fallback to pandas implementation if categorical type is needed
1 parent 6f1a4be commit 515f7b5

3 files changed

Lines changed: 467 additions & 25 deletions

File tree

sdc/hiframes/hiframes_untyped.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1938,7 +1938,3 @@ def simple_block_copy_propagate(block):
19381938
for k in lhs_kill:
19391939
var_dict.pop(k, None)
19401940
return
1941-
1942-
1943-
def _sanitize_varname(varname):
1944-
return varname.replace('$', '_').replace('.', '_')

sdc/io/csv_ext.py

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@
5353

5454
from sdc.hiframes.pd_categorical_ext import (PDCategoricalDtype, CategoricalArray)
5555

56+
import pyarrow
57+
import pyarrow.csv
58+
5659

5760
class CsvReader(ir.Stmt):
5861
def __init__(self, file_name, df_out, sep, df_colnames, out_vars, out_types, usecols, loc, skiprows=0):
@@ -345,7 +348,7 @@ def _get_pd_dtype_str(t):
345348
compiled_funcs = []
346349

347350

348-
def _gen_csv_reader_py(col_names, col_typs, usecols, sep, typingctx, targetctx, parallel, skiprows):
351+
def _gen_csv_reader_py_pandas(col_names, col_typs, usecols, sep, typingctx, targetctx, parallel, skiprows):
349352
# TODO: support non-numpy types like strings
350353
date_inds = ", ".join(str(i) for i, t in enumerate(col_typs) if t.dtype == types.NPDatetime('ns'))
351354
typ_strs = ", ".join(["{}='{}'".format(_sanitize_varname(cname), _get_dtype_str(t))
@@ -384,3 +387,147 @@ def _sanitize_varname(varname):
384387
if not new_name[0].isalpha():
385388
new_name = '_' + new_name
386389
return new_name
390+
391+
392+
# TODO: move to hpat.common
393+
def to_varname(string):
394+
"""Converts string to correct Python variable name.
395+
Replaces unavailable symbols with _ and insert _ if string starts with digit.
396+
"""
397+
import re
398+
return re.sub(r'\W|^(?=\d)','_', string)
399+
400+
401+
def pandas_read_csv(
402+
filepath_or_buffer,
403+
sep=",",
404+
# Column and Index Locations and Names
405+
names=None,
406+
usecols=None,
407+
# General Parsing Configuration
408+
dtype=None,
409+
skiprows=None,
410+
# Datetime Handling
411+
parse_dates=False,
412+
):
413+
"""Implements pandas.read_csv via pyarrow.csv.read_csv.
414+
This function has the same interface as pandas.read_csv.
415+
"""
416+
417+
# Fallback to pandas
418+
need_categorical = isinstance(dtype, pd.CategoricalDtype)
419+
try:
420+
need_categorical |= any(isinstance(v, pd.CategoricalDtype) for v in dtype.values())
421+
except: pass
422+
423+
if need_categorical:
424+
return pd.read_csv(
425+
filepath_or_buffer,
426+
sep=sep,
427+
names=names,
428+
usecols=usecols,
429+
dtype=dtype,
430+
skiprows=skiprows,
431+
parse_dates=parse_dates
432+
)
433+
434+
autogenerate_column_names = bool(names)
435+
436+
include_columns = None
437+
438+
# categories = None
439+
440+
if usecols is not None:
441+
include_columns = [f'f{i}' for i in usecols]
442+
443+
read_options = pyarrow.csv.ReadOptions(
444+
skip_rows=skiprows,
445+
# column_names=column_names,
446+
autogenerate_column_names=autogenerate_column_names,
447+
)
448+
449+
parse_options = pyarrow.csv.ParseOptions(
450+
delimiter=sep,
451+
)
452+
453+
# try:
454+
# keys = [k for k, v in dtype.items() if isinstance(v, pd.CategoricalDtype)]
455+
# if keys:
456+
# for k in keys:
457+
# del dtype[k]
458+
# names_list = list(names)
459+
# categories = [f"f{names_list.index(k)}" for k in keys]
460+
# except: pass
461+
462+
if dtype is not None:
463+
names_list = list(names)
464+
if not hasattr(dtype, 'items'):
465+
dtype = { f"f{names_list.index(k)}": pyarrow.from_numpy_dtype(dtype) for k in names }
466+
else:
467+
dtype = { f"f{names_list.index(k)}": pyarrow.from_numpy_dtype(v) for k, v in dtype.items() }
468+
469+
try:
470+
for column in parse_dates:
471+
name = f"f{column}"
472+
# TODO: Try to help pyarrow infer date type - set DateType.
473+
# dtype[name] = pyarrow.from_numpy_dtype(np.datetime64) # string
474+
del dtype[name]
475+
except: pass
476+
477+
convert_options = pyarrow.csv.ConvertOptions(
478+
column_types=dtype,
479+
include_columns=include_columns,
480+
)
481+
482+
table = pyarrow.csv.read_csv(
483+
filepath_or_buffer,
484+
read_options=read_options,
485+
parse_options=parse_options,
486+
convert_options=convert_options,
487+
)
488+
489+
dataframe = table.to_pandas(
490+
# categories=categories,
491+
)
492+
493+
if names is not None:
494+
dataframe.columns = names
495+
496+
return dataframe
497+
498+
499+
def _gen_csv_reader_py_pyarrow(col_names, col_typs, usecols, sep, typingctx, targetctx, parallel, skiprows):
500+
# TODO: support non-numpy types like strings
501+
date_inds = ", ".join(str(i) for i, t in enumerate(col_typs) if t.dtype == types.NPDatetime('ns'))
502+
typ_strs = ", ".join(["{}='{}'".format(to_varname(cname), _get_dtype_str(t))
503+
for cname, t in zip(col_names, col_typs)])
504+
pd_dtype_strs = ", ".join(["'{}':{}".format(cname, _get_pd_dtype_str(t)) for cname, t in zip(col_names, col_typs)])
505+
506+
func_text = "def csv_reader_py(fname):\n"
507+
func_text += " with objmode({}):\n".format(typ_strs)
508+
func_text += " df = pandas_read_csv(fname, names={},\n".format(col_names)
509+
func_text += " parse_dates=[{}],\n".format(date_inds)
510+
func_text += " dtype={{{}}},\n".format(pd_dtype_strs)
511+
func_text += " skiprows={},\n".format(skiprows)
512+
func_text += " usecols={}, sep='{}')\n".format(usecols, sep)
513+
for cname in col_names:
514+
func_text += " {} = df['{}'].values\n".format(to_varname(cname), cname)
515+
# func_text += " print({})\n".format(cname)
516+
func_text += " return ({},)\n".format(", ".join(to_varname(c) for c in col_names))
517+
518+
# print(func_text)
519+
glbls = globals() # TODO: fix globals after Numba's #3355 is resolved
520+
# {'objmode': objmode, 'csv_file_chunk_reader': csv_file_chunk_reader,
521+
# 'pd': pd, 'np': np}
522+
loc_vars = {}
523+
exec(func_text, glbls, loc_vars)
524+
csv_reader_py = loc_vars['csv_reader_py']
525+
526+
# TODO: no_cpython_wrapper=True crashes for some reason
527+
jit_func = numba.njit(csv_reader_py)
528+
compiled_funcs.append(jit_func)
529+
return jit_func
530+
531+
532+
# _gen_csv_reader_py = _gen_csv_reader_py_pandas
533+
_gen_csv_reader_py = _gen_csv_reader_py_pyarrow

0 commit comments

Comments
 (0)