Skip to content

Commit 27dfe84

Browse files
Add empty windowing index test and fix TODO
1 parent 6acac54 commit 27dfe84

2 files changed

Lines changed: 40 additions & 1 deletion

File tree

streamz/dataframe/aggregations.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,10 @@ def diff_loc(dfs, new, window=None):
212212
mn = mx - pd.Timedelta(window) + pd.Timedelta('1ns')
213213
while pd.Timestamp(dfs[0].index.min()) < mn:
214214
o = dfs[0].loc[:mn]
215-
old.append(o) # TODO: avoid copy if fully lost
215+
if len(old) > 0:
216+
old.append(o)
217+
else:
218+
old = [o]
216219
dfs[0] = dfs[0].iloc[len(o):]
217220
if not len(dfs[0]):
218221
dfs.popleft()

streamz/dataframe/tests/test_dataframes.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,42 @@ def f(x):
814814
assert_eq(L[-1], f(last))
815815

816816

817+
def test_windowing_value_empty_intermediate_index(stream):
818+
def preprocess(df):
819+
mask = df["amount"] == 5
820+
df = df.loc[mask]
821+
return df
822+
823+
source = stream.map(preprocess)
824+
825+
example = pd.DataFrame({"amount":[]})
826+
sdf = DataFrame(stream=source, example=example)
827+
828+
output = sdf.window("2h").amount.sum().stream.gather().sink_to_list()
829+
830+
stream.emit(pd.DataFrame({"amount": [1, 2, 3]}, index=[pd.Timestamp("2050-01-01 00:00:00"),
831+
pd.Timestamp("2050-01-01 01:00:00"),
832+
pd.Timestamp("2050-01-01 02:00:00")]))
833+
834+
stream.emit(pd.DataFrame({"amount": [5, 5, 5]}, index=[pd.Timestamp("2050-01-01 03:00:00"),
835+
pd.Timestamp("2050-01-01 04:00:00"),
836+
pd.Timestamp("2050-01-01 05:00:00")]))
837+
838+
stream.emit(pd.DataFrame({"amount": [4, 5, 6]}, index=[pd.Timestamp("2050-01-01 06:00:00"),
839+
pd.Timestamp("2050-01-01 07:00:00"),
840+
pd.Timestamp("2050-01-01 08:00:00")]))
841+
842+
stream.emit(pd.DataFrame({"amount": [1, 2, 3]}, index=[pd.Timestamp("2050-01-01 09:00:00"),
843+
pd.Timestamp("2050-01-01 10:00:00"),
844+
pd.Timestamp("2050-01-01 11:00:00")]))
845+
846+
stream.emit(pd.DataFrame({"amount": [5, 5, 5]}, index=[pd.Timestamp("2050-01-01 12:00:00"),
847+
pd.Timestamp("2050-01-01 13:00:00"),
848+
pd.Timestamp("2050-01-01 14:00:00")]))
849+
850+
assert_eq(output, [0, 10, 5, 5, 10])
851+
852+
817853
def test_window_full():
818854
df = pd.DataFrame({'x': np.arange(10, dtype=float), 'y': [1.0, 2.0] * 5})
819855

0 commit comments

Comments
 (0)