Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,19 @@ examples:
pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
}

---

coder:
urn: "beam:coder:double:v1"
examples:
"\0\0\0\0\0\0\0\0": "0"
"\u0080\0\0\0\0\0\0\0": "-0"
"\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": "0.1"
"\u00bf\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": "-0.1"
"\0\0\0\0\0\0\0\u0001": "4.9e-324"
"\0\u0001\0\0\0\0\0\0": "1.390671161567e-309"
"\u007f\u00ef\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff": "1.7976931348623157e308"
"\u007f\u00f0\0\0\0\0\0\0": "Infinity"
"\u00ff\u00f0\0\0\0\0\0\0": "-Infinity"
"\u007f\u00f8\0\0\0\0\0\0": "NaN"
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
.put(LengthPrefixCoder.class, ModelCoders.LENGTH_PREFIX_CODER_URN)
.put(GlobalWindow.Coder.class, ModelCoders.GLOBAL_WINDOW_CODER_URN)
.put(FullWindowedValueCoder.class, ModelCoders.WINDOWED_VALUE_CODER_URN)
.put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN)
.build();

public static final Set<String> WELL_KNOWN_CODER_URNS = BEAM_MODEL_CODER_URNS.values();
Expand All @@ -70,6 +72,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar {
.put(Timer.Coder.class, CoderTranslators.timer())
.put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())
.put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue())
.put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class))
.build();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ private ModelCoders() {}
// coders?
public static final String INT64_CODER_URN = getUrn(StandardCoders.Enum.VARINT);

public static final String DOUBLE_CODER_URN = getUrn(StandardCoders.Enum.DOUBLE);

public static final String ITERABLE_CODER_URN = getUrn(StandardCoders.Enum.ITERABLE);
public static final String TIMER_CODER_URN = getUrn(StandardCoders.Enum.TIMER);
public static final String KV_CODER_URN = getUrn(StandardCoders.Enum.KV);
Expand All @@ -61,7 +63,8 @@ private ModelCoders() {}
LENGTH_PREFIX_CODER_URN,
GLOBAL_WINDOW_CODER_URN,
INTERVAL_WINDOW_CODER_URN,
WINDOWED_VALUE_CODER_URN);
WINDOWED_VALUE_CODER_URN,
DOUBLE_CODER_URN);

public static Set<String> urns() {
return MODEL_CODER_URNS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class CoderTranslationTest {
.add(
FullWindowedValueCoder.of(
IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of()))
.add(DoubleCoder.of())
.build();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class CommonCoderTest {
.put(getUrn(StandardCoders.Enum.ITERABLE), IterableCoder.class)
.put(getUrn(StandardCoders.Enum.TIMER), Timer.Coder.class)
.put(getUrn(StandardCoders.Enum.GLOBAL_WINDOW), GlobalWindow.Coder.class)
.put(getUrn(StandardCoders.Enum.DOUBLE), DoubleCoder.class)
.put(
getUrn(StandardCoders.Enum.WINDOWED_VALUE),
WindowedValue.FullWindowedValueCoder.class)
Expand Down Expand Up @@ -270,6 +272,8 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co
(int) paneInfoMap.get("index"),
(int) paneInfoMap.get("on_time_index"));
return WindowedValue.of(windowValue, timestamp, windows, paneInfo);
} else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
return Double.parseDouble((String) value);
} else {
throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn());
}
Expand Down Expand Up @@ -298,6 +302,8 @@ private static Coder<?> instantiateCoder(CommonCoder coder) {
} else if (s.equals(getUrn(StandardCoders.Enum.WINDOWED_VALUE))) {
return WindowedValue.FullWindowedValueCoder.of(
components.get(0), (Coder<BoundedWindow>) components.get(1));
} else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
return DoubleCoder.of();
} else {
throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
}
Expand Down Expand Up @@ -357,6 +363,9 @@ private void verifyDecodedValue(CommonCoder coder, Object expectedValue, Object
} else if (s.equals(getUrn(StandardCoders.Enum.WINDOWED_VALUE))) {
assertEquals(expectedValue, actualValue);

} else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {

assertEquals(expectedValue, actualValue);
} else {
throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
}
Expand Down
16 changes: 14 additions & 2 deletions sdks/python/apache_beam/coders/standard_coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class StandardCodersTest(unittest.TestCase):
'beam:coder:windowed_value:v1':
lambda v, w: coders.WindowedValueCoder(v, w),
'beam:coder:timer:v1': coders._TimerCoder,
'beam:coder:double:v1': coders.FloatCoder,
}

_urn_to_json_value_parser = {
Expand All @@ -87,6 +88,7 @@ class StandardCodersTest(unittest.TestCase):
lambda x, payload_parser: dict(
payload=payload_parser(x['payload']),
timestamp=Timestamp(micros=x['timestamp'])),
'beam:coder:double:v1': lambda x: float(x),
}

def test_standard_coders(self):
Expand All @@ -95,6 +97,16 @@ def test_standard_coders(self):
self._run_standard_coder(name, spec)

def _run_standard_coder(self, name, spec):
def assert_equal(actual, expected):
"""Handle nan values which self.assertEqual fails on."""
import math
if (isinstance(actual, float)
and isinstance(expected, float)
and math.isnan(actual)
and math.isnan(expected)):
return
self.assertEqual(actual, expected)

coder = self.parse_coder(spec['coder'])
parse_value = self.json_value_parser(spec['coder'])
nested_list = [spec['nested']] if 'nested' in spec else [True, False]
Expand All @@ -108,8 +120,8 @@ def _run_standard_coder(self, name, spec):
self.to_fix[spec['index'], expected_encoded] = actual_encoded
else:
self.assertEqual(expected_encoded, actual_encoded)
self.assertEqual(decode_nested(coder, expected_encoded, nested),
value)
decoded = decode_nested(coder, expected_encoded, nested)
assert_equal(decoded, value)
else:
# Only verify decoding for a non-deterministic coder
self.assertEqual(decode_nested(coder, expected_encoded, nested),
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/testing/data/standard_coders.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,19 @@ examples:
pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}]
}

---

coder:
urn: "beam:coder:double:v1"
examples:
"\0\0\0\0\0\0\0\0": "0"
"\u0080\0\0\0\0\0\0\0": "-0"
"\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": "0.1"
"\u00bf\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": "-0.1"
"\0\0\0\0\0\0\0\u0001": "4.9e-324"
"\0\u0001\0\0\0\0\0\0": "1.390671161567e-309"
"\u007f\u00ef\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff": "1.7976931348623157e308"
"\u007f\u00f0\0\0\0\0\0\0": "Infinity"
"\u00ff\u00f0\0\0\0\0\0\0": "-Infinity"
"\u007f\u00f8\0\0\0\0\0\0": "NaN"