Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 32 additions & 40 deletions sdks/python/apache_beam/transforms/periodicsequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,28 +33,27 @@
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils import timestamp
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import TimestampTypes


class ImpulseSeqGenRestrictionProvider(core.RestrictionProvider):
def initial_restriction(self, element):
start, end, interval = element
if isinstance(start, Timestamp):
start_micros = start.micros
else:
start_micros = round(start * 1000000)
if not isinstance(start, Timestamp):
start = Timestamp.of(start)

if isinstance(end, Timestamp):
end_micros = end.micros
else:
end_micros = round(end * 1000000)
if not isinstance(end, Timestamp):
end = Timestamp.of(end)

interval_micros = round(interval * 1000000)
interval_duration = Duration(interval)

assert start_micros <= end_micros
assert start <= end
assert interval > 0
delta_micros: int = end_micros - start_micros
total_outputs = math.ceil(delta_micros / interval_micros)
total_duration = end - start
total_outputs = math.ceil(total_duration.micros / interval_duration.micros)

return OffsetRange(0, total_outputs)

def create_tracker(self, restriction):
Expand Down Expand Up @@ -230,38 +229,31 @@ def _validate_and_adjust_duration(self):
assert self.data

# The total time we need to impulse all the data.
data_duration = (len(self.data) - 1) * self.interval
data_duration = (len(self.data) - 1) * Duration(self.interval)

is_pre_timestamped = isinstance(self.data[0], tuple) and \
isinstance(self.data[0][0], timestamp.Timestamp)

if isinstance(self.start_ts, Timestamp):
start = self.start_ts.micros / 1000000
else:
start = self.start_ts

if isinstance(self.stop_ts, Timestamp):
if self.stop_ts == MAX_TIMESTAMP:
# When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the
# data's actual end time plus an extra fire interval, because the
# impulse duration's upper bound is exclusive.
end = start + data_duration + self.interval
self.stop_ts = Timestamp(micros=end * 1000000)
else:
end = self.stop_ts.micros / 1000000
else:
end = self.stop_ts
start_ts = Timestamp.of(self.start_ts)
stop_ts = Timestamp.of(self.stop_ts)

if stop_ts == MAX_TIMESTAMP:
# When the stop timestamp is unbounded (MAX_TIMESTAMP), set it to the
# data's actual end time plus an extra fire interval, because the
# impulse duration's upper bound is exclusive.
self.stop_ts = start_ts + data_duration + Duration(self.interval)
stop_ts = self.stop_ts

# The total time for the impulse signal which occurs in [start, end).
impulse_duration = end - start
if round(data_duration + self.interval, 6) < round(impulse_duration, 6):
impulse_duration = stop_ts - start_ts
if data_duration + Duration(self.interval) < impulse_duration:
# We don't have enough data for the impulse.
# If we can fit at least one more data point in the impulse duration,
# then we will be in the repeat mode.
message = 'The number of elements in the provided pre-timestamped ' \
'data sequence is not enough to span the full impulse duration. ' \
f'Expected duration: {impulse_duration:.6f}, ' \
f'actual data duration: {data_duration:.6f}.'
f'Expected duration: {impulse_duration}, ' \
f'actual data duration: {data_duration}.'

if is_pre_timestamped:
raise ValueError(
Expand All @@ -274,8 +266,8 @@ def _validate_and_adjust_duration(self):

def __init__(
self,
start_timestamp: Timestamp = Timestamp.now(),
stop_timestamp: Timestamp = MAX_TIMESTAMP,
start_timestamp: TimestampTypes = Timestamp.now(),
stop_timestamp: TimestampTypes = MAX_TIMESTAMP,
fire_interval: float = 360.0,
apply_windowing: bool = False,
data: Optional[Sequence[Any]] = None):
Expand Down Expand Up @@ -327,11 +319,11 @@ def expand(self, pbegin):
| 'GenSequence' >> beam.ParDo(ImpulseSeqGenDoFn(self.data)))

if not self.data:
# This step is only to ensure the current PTransform expansion is
# compatible with the previous Beam versions.
result = (
result
| 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))
# This step is actually an identity transform, because the Timestamped
# values have already been generated in `ImpulseSeqGenDoFn`.
# We keep this step here to prevent the current PeriodicImpulse from
# breaking the compatibility.
result = (result | 'MapToTimestamped' >> beam.Map(lambda tt: tt))

if self.apply_windowing:
result = result | 'ApplyWindowing' >> beam.WindowInto(
Expand Down
74 changes: 65 additions & 9 deletions sdks/python/apache_beam/transforms/periodicsequence_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,22 +261,78 @@ def test_not_enough_timestamped_value(self):
data=data,
fire_interval=0.5))

def test_fuzzy_interval(self):
seed = int(time.time() * 1000)
def test_fuzzy_length_and_interval(self):
times = 30
logging.warning("random seed=%d", seed)
random.seed(seed)
for _ in range(times):
seed = int(time.time() * 1000)
random.seed(seed)
n = int(random.randint(1, 100))
data = list(range(n))
m = random.randint(1, 1000)
interval = m / 1e6
now = Timestamp.now()
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=now, data=data, fire_interval=interval))
assert_that(ret, equal_to(data))
try:
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=now, data=data, fire_interval=interval))
assert_that(ret, equal_to(data))
except Exception as e: # pylint: disable=broad-except
logging.error("Error occurred at random seed=%d", seed)
raise e

def test_fuzzy_length_at_minimal_interval(self):
times = 30
for _ in range(times):
seed = int(time.time() * 1000)
seed = 1751135957975
random.seed(seed)
n = int(random.randint(1, 100))
data = list(range(n))
interval = 1e-6
now = Timestamp.now()
try:
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=now, data=data, fire_interval=interval))
assert_that(ret, equal_to(data))
except Exception as e: # pylint: disable=broad-except
logging.error("Error occurred at random seed=%d", seed)
raise e

def test_int_type_input(self):
# This test is to verify that if input timestamps and interval are integers,
# the generated timestamped values are also integers.
# This is necessary for the following test to pass:
# apache_beam.examples.snippets.snippets_test.SlowlyChangingSideInputsTest
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=1, stop_timestamp=5, fire_interval=1))
expected = [1, 2, 3, 4]
assert_that(
ret, equal_to(expected, lambda x, y: type(x) is type(y) and x == y))

def test_float_type_input(self):
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=1.0, stop_timestamp=5.0, fire_interval=1))
expected = [1.0, 2.0, 3.0, 4.0]
assert_that(
ret, equal_to(expected, lambda x, y: type(x) is type(y) and x == y))

def test_timestamp_type_input(self):
with TestPipeline() as p:
ret = (
p | PeriodicImpulse(
start_timestamp=Timestamp.of(1),
stop_timestamp=Timestamp.of(5),
fire_interval=1))
expected = [1.0, 2.0, 3.0, 4.0]
assert_that(
ret, equal_to(expected, lambda x, y: type(x) is type(y) and x == y))


if __name__ == '__main__':
Expand Down
Loading