From 4094422d58077aa95129a7ec9fddf75c2e3af7a7 Mon Sep 17 00:00:00 2001 From: cclauss Date: Fri, 17 Aug 2018 01:06:59 +0200 Subject: [PATCH] Add test_slice() to streaming BasicOperations As suggested in https://github.com/apache/spark/pull/20838#pullrequestreview-139118618 --- python/pyspark/streaming/tests.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 09af47a597bed..cf83498737f3a 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -15,6 +15,7 @@ # limitations under the License. # +import datetime as dt import glob import os import sys @@ -206,6 +207,21 @@ def func(dstream): expected = [[len(x)] for x in input] self._test_func(input, func, expected) + def test_slice(self): + """Basic operation test for DStream.slice.""" + eol_python2 = dt.datetime(2020, 1, 1) + five_secs = dt.timedelta(seconds=5) + input = [eol_python2 - five_secs, eol_python2] + def func(dstream): + return dstream.slice() + expected = [[dt.datetime(2019, 12, 31, 23, 55)], + [dt.datetime(2019, 12, 31, 23, 56)], + [dt.datetime(2019, 12, 31, 23, 57)], + [dt.datetime(2019, 12, 31, 23, 58)], + [dt.datetime(2019, 12, 31, 23, 59)], + [dt.datetime(2020, 1, 1)]] + self._test_func(input, func, expected) + def test_reduce(self): """Basic operation test for DStream.reduce.""" input = [range(1, 5), range(5, 9), range(9, 13)]