Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/python/apache_beam/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/testing/pipeline_verifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
`hamcrest.core.base_matcher.BaseMatcher` and override _matches.
"""

from __future__ import absolute_import

import logging
import time

Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/testing/pipeline_verifiers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

"""Unit tests for the test pipeline verifiers"""

from __future__ import absolute_import

import logging
import tempfile
import unittest
from builtins import range

from hamcrest import assert_that as hc_assert_that
from mock import Mock
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/testing/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Test Pipeline, a wrapper of Pipeline for test purpose"""

from __future__ import absolute_import

import argparse
import shlex

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/testing/test_pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Unit test for the TestPipeline class"""

from __future__ import absolute_import

import logging
import unittest

Expand Down
51 changes: 38 additions & 13 deletions sdks/python/apache_beam/testing/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@

For internal use only; no backwards-compatibility guarantees.
"""
from __future__ import absolute_import

from abc import ABCMeta
from abc import abstractmethod
from builtins import object
from functools import total_ordering

from future.utils import with_metaclass

from apache_beam import coders
from apache_beam import core
Expand All @@ -41,18 +46,20 @@
]


class Event(object):
@total_ordering
class Event(with_metaclass(ABCMeta, object)):
"""Test stream event to be emitted during execution of a TestStream."""

__metaclass__ = ABCMeta
@abstractmethod
def __eq__(self, other):
raise NotImplementedError

def __cmp__(self, other):
if type(self) is not type(other):
return cmp(type(self), type(other))
return self._typed_cmp(other)
@abstractmethod
def __hash__(self):
raise NotImplementedError

@abstractmethod
def _typed_cmp(self, other):
def __lt__(self, other):
raise NotImplementedError


Expand All @@ -62,8 +69,14 @@ class ElementEvent(Event):
def __init__(self, timestamped_values):
self.timestamped_values = timestamped_values

def _typed_cmp(self, other):
return cmp(self.timestamped_values, other.timestamped_values)
def __eq__(self, other):
return self.timestamped_values == other.timestamped_values

def __hash__(self):
return hash(self.timestamped_values)

def __lt__(self, other):
return self.timestamped_values < other.timestamped_values


class WatermarkEvent(Event):
Expand All @@ -72,8 +85,14 @@ class WatermarkEvent(Event):
def __init__(self, new_watermark):
self.new_watermark = timestamp.Timestamp.of(new_watermark)

def _typed_cmp(self, other):
return cmp(self.new_watermark, other.new_watermark)
def __eq__(self, other):
return self.new_watermark == other.new_watermark

def __hash__(self):
return hash(self.new_watermark)

def __lt__(self, other):
return self.new_watermark < other.new_watermark


class ProcessingTimeEvent(Event):
Expand All @@ -82,8 +101,14 @@ class ProcessingTimeEvent(Event):
def __init__(self, advance_by):
self.advance_by = timestamp.Duration.of(advance_by)

def _typed_cmp(self, other):
return cmp(self.advance_by, other.advance_by)
def __eq__(self, other):
return self.advance_by == other.advance_by

def __hash__(self):
return hash(self.advance_by)

def __lt__(self, other):
return self.advance_by < other.advance_by


class TestStream(PTransform):
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/testing/test_stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Unit tests for the test_stream module."""

from __future__ import absolute_import

import unittest

import apache_beam as beam
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/testing/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
For internal use only; no backwards-compatibility guarantees.
"""

from __future__ import absolute_import

import hashlib
import imp
import logging
import os
import shutil
import tempfile
import time
from builtins import object

from mock import Mock
from mock import patch
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/testing/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Unittest for testing utilities,"""

from __future__ import absolute_import

import logging
import os
import tempfile
Expand Down Expand Up @@ -52,7 +54,7 @@ def test_delete_files_fails_with_io_error(self):
utils.delete_files([path])
self.assertTrue(
error.exception.args[0].startswith('Delete operation failed'))
self.assertEqual(error.exception.exception_details.keys(), [path])
self.assertEqual(list(error.exception.exception_details.keys()), [path])

def test_delete_files_fails_with_invalid_arg(self):
with self.assertRaises(RuntimeError):
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/testing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import collections
import glob
import tempfile
from builtins import object

from apache_beam import pvalue
from apache_beam.transforms import window
Expand Down Expand Up @@ -69,6 +70,9 @@ def __init__(self, iterable):
def __eq__(self, other):
return self._counter == collections.Counter(other)

def __hash__(self):
return hash(self._counter)

def __repr__(self):
return "InAnyOrder(%s)" % self._counter

Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/testing/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

"""Unit tests for testing utilities."""

from __future__ import absolute_import

import unittest

from apache_beam import Create
Expand Down
1 change: 1 addition & 0 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ deps =
flake8==3.5.0
modules =
apache_beam/coders
apache_beam/testing
apache_beam/portability
apache_beam/internal
apache_beam/metrics
Expand Down