Skip to content

Commit 56f3305

Browse files
authored
Merge pull request #360 from chinmaychandak/windowing_fix
Fix windowing over time bugs
2 parents 239b807 + 27dfe84 commit 56f3305

2 files changed

Lines changed: 67 additions & 24 deletions

File tree

streamz/dataframe/aggregations.py

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -167,18 +167,20 @@ def diff_iloc(dfs, new, window=None):
167167
List of dataframes to decay
168168
"""
169169
dfs = deque(dfs)
170-
dfs.append(new)
170+
if len(new) > 0:
171+
dfs.append(new)
171172
old = []
172-
n = sum(map(len, dfs)) - window
173-
while n > 0:
174-
if len(dfs[0]) <= n:
175-
df = dfs.popleft()
176-
old.append(df)
177-
n -= len(df)
178-
else:
179-
old.append(dfs[0].iloc[:n])
180-
dfs[0] = dfs[0].iloc[n:]
181-
n = 0
173+
if len(dfs) > 0:
174+
n = sum(map(len, dfs)) - window
175+
while n > 0:
176+
if len(dfs[0]) <= n:
177+
df = dfs.popleft()
178+
old.append(df)
179+
n -= len(df)
180+
else:
181+
old.append(dfs[0].iloc[:n])
182+
dfs[0] = dfs[0].iloc[n:]
183+
n = 0
182184

183185
return dfs, old
184186

@@ -202,16 +204,21 @@ def diff_loc(dfs, new, window=None):
202204
List of dataframes to decay
203205
"""
204206
dfs = deque(dfs)
205-
dfs.append(new)
206-
mx = max(df.index.max() for df in dfs)
207-
mn = mx - pd.Timedelta(window)
207+
if len(new) > 0:
208+
dfs.append(new)
208209
old = []
209-
while pd.Timestamp(dfs[0].index.min()) < mn:
210-
o = dfs[0].loc[:mn]
211-
old.append(o) # TODO: avoid copy if fully lost
212-
dfs[0] = dfs[0].iloc[len(o):]
213-
if not len(dfs[0]):
214-
dfs.popleft()
210+
if len(dfs) > 0:
211+
mx = max(df.index.max() for df in dfs)
212+
mn = mx - pd.Timedelta(window) + pd.Timedelta('1ns')
213+
while pd.Timestamp(dfs[0].index.min()) < mn:
214+
o = dfs[0].loc[:mn]
215+
if len(old) > 0:
216+
old.append(o)
217+
else:
218+
old = [o]
219+
dfs[0] = dfs[0].iloc[len(o):]
220+
if not len(dfs[0]):
221+
dfs.popleft()
215222

216223
return dfs, old
217224

@@ -337,7 +344,8 @@ def windowed_groupby_accumulator(acc, new, diff=None, window=None, agg=None, gro
337344

338345
if 'groupers' in acc:
339346
groupers = deque(acc['groupers'])
340-
groupers.append(grouper)
347+
if len(grouper) > 0:
348+
groupers.append(grouper)
341349
old_groupers, groupers = diff_align(dfs, groupers)
342350
else:
343351
old_groupers = [grouper] * len(old)

streamz/dataframe/tests/test_dataframes.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -755,12 +755,11 @@ def f(x):
755755
assert len(L) == 5
756756

757757
first = df.iloc[:diff]
758-
lost = first[first.index.min() + value:]
759-
first = first.iloc[len(lost):]
758+
first = first[first.index.max() - value + pd.Timedelta('1ns'):]
760759

761760
assert_eq(L[0], f(first))
762761

763-
last = df.loc[index.max() - value + pd.Timedelta('1s'):]
762+
last = df.loc[index.max() - value + pd.Timedelta('1ns'):]
764763

765764
assert_eq(L[-1], f(last))
766765

@@ -815,6 +814,42 @@ def f(x):
815814
assert_eq(L[-1], f(last))
816815

817816

817+
def test_windowing_value_empty_intermediate_index(stream):
818+
def preprocess(df):
819+
mask = df["amount"] == 5
820+
df = df.loc[mask]
821+
return df
822+
823+
source = stream.map(preprocess)
824+
825+
example = pd.DataFrame({"amount":[]})
826+
sdf = DataFrame(stream=source, example=example)
827+
828+
output = sdf.window("2h").amount.sum().stream.gather().sink_to_list()
829+
830+
stream.emit(pd.DataFrame({"amount": [1, 2, 3]}, index=[pd.Timestamp("2050-01-01 00:00:00"),
831+
pd.Timestamp("2050-01-01 01:00:00"),
832+
pd.Timestamp("2050-01-01 02:00:00")]))
833+
834+
stream.emit(pd.DataFrame({"amount": [5, 5, 5]}, index=[pd.Timestamp("2050-01-01 03:00:00"),
835+
pd.Timestamp("2050-01-01 04:00:00"),
836+
pd.Timestamp("2050-01-01 05:00:00")]))
837+
838+
stream.emit(pd.DataFrame({"amount": [4, 5, 6]}, index=[pd.Timestamp("2050-01-01 06:00:00"),
839+
pd.Timestamp("2050-01-01 07:00:00"),
840+
pd.Timestamp("2050-01-01 08:00:00")]))
841+
842+
stream.emit(pd.DataFrame({"amount": [1, 2, 3]}, index=[pd.Timestamp("2050-01-01 09:00:00"),
843+
pd.Timestamp("2050-01-01 10:00:00"),
844+
pd.Timestamp("2050-01-01 11:00:00")]))
845+
846+
stream.emit(pd.DataFrame({"amount": [5, 5, 5]}, index=[pd.Timestamp("2050-01-01 12:00:00"),
847+
pd.Timestamp("2050-01-01 13:00:00"),
848+
pd.Timestamp("2050-01-01 14:00:00")]))
849+
850+
assert_eq(output, [0, 10, 5, 5, 10])
851+
852+
818853
def test_window_full():
819854
df = pd.DataFrame({'x': np.arange(10, dtype=float), 'y': [1.0, 2.0] * 5})
820855

0 commit comments

Comments
 (0)