Skip to content

Commit 5f63274

Browse files
committed
Replaced system time with Pandas Timestamp
1 parent a29c811 commit 5f63274

2 files changed

Lines changed: 19 additions & 18 deletions

File tree

streamz/dataframe/core.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import operator
44
from collections import OrderedDict
5-
from time import time
65
import numpy as np
76
import pandas as pd
87
import toolz
@@ -802,10 +801,10 @@ def _accumulate(self, Agg, **kwargs):
802801
return Streaming(outstream, example, stream_type=stream_type)
803802

804803

805-
def random_datapoint(**kwargs):
806-
"""Example of querying a single current value. Ignores kwargs"""
804+
def random_datapoint(now, **kwargs):
805+
"""Example of querying a single current value"""
807806
return pd.DataFrame(
808-
{'a': np.random.random(1)}, index=[pd.Timestamp.now()])
807+
{'a': np.random.random(1)}, index=[now])
809808

810809

811810
def random_datablock(last, now, **kwargs):
@@ -814,11 +813,11 @@ def random_datablock(last, now, **kwargs):
814813
815814
Parameters
816815
----------
817-
last: time
816+
last: pd.Timestamp
818817
Time of previous call to this function.
819-
now: time
818+
now: pd.Timestamp
820819
Current time.
821-
freq: timedelta, optional
820+
freq: pd.Timedelta, optional
822821
The time interval between individual records to be returned.
823822
For good throughput, should be much smaller than the
824823
interval at which this function is called.
@@ -829,9 +828,8 @@ def random_datablock(last, now, **kwargs):
829828
The y column is Poisson distributed.
830829
The z column is normally distributed.
831830
"""
832-
freq = pd.Timedelta(kwargs.get("freq", "100ms"))
833-
index = pd.date_range(start=(last + freq.total_seconds()) * 1e9,
834-
end=now * 1e9, freq=freq)
831+
freq = kwargs.get("freq", pd.Timedelta("100ms"))
832+
index = pd.date_range(start=last + freq, end=now, freq=freq)
835833

836834
df = pd.DataFrame({'x': np.random.random(len(index)),
837835
'y': np.random.poisson(size=len(index)),
@@ -879,7 +877,7 @@ def __init__(self, datafn=random_datablock, interval='500ms', dask=False, **kwar
879877
self.kwargs = kwargs
880878

881879
stream = self.source.map(lambda x: datafn(**x, **kwargs))
882-
example = datafn(last=time(), now=time(), **kwargs)
880+
example = datafn(last=pd.Timestamp.now(), now=pd.Timestamp.now(), **kwargs)
883881

884882
super(PeriodicDataFrame, self).__init__(stream, example)
885883

@@ -895,18 +893,21 @@ def stop(self):
895893
@staticmethod
896894
@gen.coroutine
897895
def _cb(interval, source, continue_):
898-
last = time()
896+
last = pd.Timestamp.now()
899897
while continue_[0]:
900898
yield gen.sleep(interval)
901-
now = time()
899+
now = pd.Timestamp.now()
902900
yield source._emit(dict(last=last, now=now))
903901
last = now
904902

905903

906904
class Random(PeriodicDataFrame):
907905
"""PeriodicDataFrame providing random values by default
908906
909-
Accepts same parameters as PeriodicDataFrame.
907+
Accepts same parameters as PeriodicDataFrame, plus
908+
`freq`, a string that will be converted to a pd.Timedelta
909+
and passed to the 'datafn'.
910+
910911
Useful mainly for examples and docs.
911912
912913
Example
@@ -916,7 +917,7 @@ class Random(PeriodicDataFrame):
916917

917918
def __init__(self, freq='100ms', interval='500ms', dask=False,
918919
datafn=random_datablock):
919-
super(Random, self).__init__(datafn, interval, dask, freq=freq)
920+
super(Random, self).__init__(datafn, interval, dask, freq=pd.Timedelta(freq))
920921

921922

922923
_stream_types['streaming'].append((is_dataframe_like, DataFrame))

streamz/tests/test_batch.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ def test_periodic_dataframes():
4141
pd = pytest.importorskip('pandas')
4242
from streamz.dataframe import PeriodicDataFrame
4343
from streamz.dataframe.core import random_datapoint
44-
df = random_datapoint()
44+
df = random_datapoint(now=pd.Timestamp.now())
4545
assert len(df) == 1
4646

47-
def callback(**kwargs):
48-
return pd.DataFrame(dict(x=50, index=[pd.Timestamp.now()]))
47+
def callback(now, **kwargs):
48+
return pd.DataFrame(dict(x=50, index=[now]))
4949

5050
df = PeriodicDataFrame(callback, interval='20ms')
5151
assert df.tail(0).x == 50

0 commit comments

Comments
 (0)