Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 26 additions & 33 deletions sdks/python/apache_beam/transforms/periodicsequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,26 @@
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import timestamp
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import Timestamp


class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
def initial_restriction(self, element):
start, end, interval = element
if isinstance(start, Timestamp):
start_micros = start.micros
else:
start_micros = round(start * 1000000)
if not isinstance(start, Timestamp):
start = Timestamp.of(start)

if isinstance(end, Timestamp):
end_micros = end.micros
else:
end_micros = round(end * 1000000)
if not isinstance(end, Timestamp):
end = Timestamp.of(end)

interval_micros = round(interval * 1000000)
interval_duration = Duration(interval)

assert start_micros <= end_micros
assert start <= end
assert interval > 0
delta_micros: int = end_micros - start_micros
total_outputs = math.ceil(delta_micros / interval_micros)
total_duration = end - start
total_outputs = math.ceil(total_duration.micros / interval_duration.micros)

return OffsetRange(0, total_outputs)

def create_tracker(self, restriction):
Expand All @@ -75,6 +73,9 @@ def _sequence_backlog_bytes(element, now, offset_range):
start, _, interval = element
if isinstance(start, Timestamp):
start = start.micros / 1000000
elif isinstance(start, int):
start = start / 1000000

assert interval > 0

now_index = math.floor((now - start) / interval)
Expand Down Expand Up @@ -157,6 +158,8 @@ def process(

if isinstance(start, Timestamp):
start = start.micros / 1000000
elif isinstance(start, int):
start = start / 1000000

assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)

Expand Down Expand Up @@ -230,38 +233,28 @@ def _validate_and_adjust_duration(self):
assert self.data

# The total time we need to impulse all the data.
data_duration = (len(self.data) - 1) * self.interval
data_duration = (len(self.data) - 1) * Duration(self.interval)

is_pre_timestamped = isinstance(self.data[0], tuple) and \
isinstance(self.data[0][0], timestamp.Timestamp)

if isinstance(self.start_ts, Timestamp):
start = self.start_ts.micros / 1000000
else:
start = self.start_ts

if isinstance(self.stop_ts, Timestamp):
if self.stop_ts == MAX_TIMESTAMP:
# When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the
# data's actual end time plus an extra fire interval, because the
# impulse duration's upper bound is exclusive.
end = start + data_duration + self.interval
self.stop_ts = Timestamp(micros=end * 1000000)
else:
end = self.stop_ts.micros / 1000000
else:
end = self.stop_ts
if self.stop_ts == MAX_TIMESTAMP:
# When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the
# data's actual end time plus an extra fire interval, because the
# impulse duration's upper bound is exclusive.
self.stop_ts = self.start_ts + data_duration + Duration(self.interval)

# The total time for the impulse signal which occurs in [start, end).
impulse_duration = end - start
if round(data_duration + self.interval, 6) < round(impulse_duration, 6):
impulse_duration = self.stop_ts - self.start_ts
#if round(data_duration + self.interval, 6) < round(impulse_duration, 6):
if data_duration + Duration(self.interval) < impulse_duration:
# We don't have enough data for the impulse.
# If we can fit at least one more data point in the impulse duration,
# then we will be in the repeat mode.
message = 'The number of elements in the provided pre-timestamped ' \
'data sequence is not enough to span the full impulse duration. ' \
f'Expected duration: {impulse_duration:.6f}, ' \
f'actual data duration: {data_duration:.6f}.'
f'Expected duration: {impulse_duration}, ' \
f'actual data duration: {data_duration}.'

if is_pre_timestamped:
raise ValueError(
Expand Down
40 changes: 31 additions & 9 deletions sdks/python/apache_beam/transforms/periodicsequence_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,22 +261,44 @@ def test_not_enough_timestamped_value(self):
data=data,
fire_interval=0.5))

def test_fuzzy_interval(self):
seed = int(time.time() * 1000)
def test_fuzzy_length_and_interval(self):
times = 30
logging.warning("random seed=%d", seed)
random.seed(seed)
for _ in range(times):
seed = int(time.time() * 1000)
random.seed(seed)
n = int(random.randint(1, 100))
data = list(range(n))
m = random.randint(1, 1000)
interval = m / 1e6
now = Timestamp.now()
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=now, data=data, fire_interval=interval))
assert_that(ret, equal_to(data))
try:
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=now, data=data, fire_interval=interval))
assert_that(ret, equal_to(data))
except Exception as e: # pylint: disable=broad-except
logging.error("Error occurred at random seed=%d", seed)
raise e

def test_fuzzy_length_at_minimal_interval(self):
times = 30
for _ in range(times):
seed = int(time.time() * 1000)
random.seed(seed)
n = int(random.randint(1, 100))
data = list(range(n))
interval = 1e-6
now = Timestamp.now()
try:
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=now, data=data, fire_interval=interval))
assert_that(ret, equal_to(data))
except Exception as e: # pylint: disable=broad-except
logging.error("Error occurred at random seed=%d", seed)
raise e


if __name__ == '__main__':
Expand Down
Loading