Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/travis_script_integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ conda install -y -q python=3.6 six numpy
# ARROW-4008: Create a directory to write temporary files since /tmp can be
# unstable in Travis CI
INTEGRATION_TEMPDIR=$TRAVIS_BUILD_DIR/integration_temp
GOLD_14_1=$ARROW_TEST_DATA/arrow-ipc/integration/0.14.1
mkdir -p $INTEGRATION_TEMPDIR

python integration_test.py --debug --tempdir=$INTEGRATION_TEMPDIR --run_flight
python integration_test.py --debug --tempdir=$INTEGRATION_TEMPDIR --gold_dirs=$GOLD_14_1 --run_flight

popd

Expand Down
170 changes: 106 additions & 64 deletions integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import binascii
import contextlib
import glob
import gzip
import itertools
import json
import os
Expand Down Expand Up @@ -1018,7 +1019,9 @@ def generate_decimal_case():
possible_batch_sizes = 7, 10
batch_sizes = [possible_batch_sizes[i % 2] for i in range(len(fields))]

return _generate_file('decimal', fields, batch_sizes)
skip = set()
skip.add('Go') # TODO(ARROW-3676)
return _generate_file('decimal', fields, batch_sizes, skip=skip)


def generate_datetime_case():
Expand Down Expand Up @@ -1055,7 +1058,9 @@ def generate_interval_case():
]

batch_sizes = [7, 10]
return _generate_file("interval", fields, batch_sizes)
skip = set()
skip.add('JS') # TODO(ARROW-5239)
return _generate_file("interval", fields, batch_sizes, skip=skip)


def generate_map_case():
Expand All @@ -1067,7 +1072,10 @@ def generate_map_case():
]

batch_sizes = [7, 10]
return _generate_file("map", fields, batch_sizes)
skip = set()
skip.add('JS') # TODO(ARROW-1279)
skip.add('Go') # TODO(ARROW-3679)
return _generate_file("map", fields, batch_sizes, skip=skip)


def generate_nested_case():
Expand Down Expand Up @@ -1103,9 +1111,12 @@ def generate_dictionary_case():
DictionaryType('dict1', get_field('', 'int32'), dict1),
DictionaryType('dict2', get_field('', 'int16'), dict2)
]
skip = set()
skip.add('Go') # TODO(ARROW-3039)
batch_sizes = [7, 10]
return _generate_file("dictionary", fields, batch_sizes,
dictionaries=[dict0, dict1, dict2])
dictionaries=[dict0, dict1, dict2],
skip=skip)


def generate_nested_dictionary_case():
Expand Down Expand Up @@ -1183,14 +1194,32 @@ def __init__(self, json_files, testers, args):
self.temp_dir = args.tempdir or tempfile.mkdtemp()
self.debug = args.debug
self.stop_on_error = args.stop_on_error
self.gold_dirs = args.gold_dirs

def run(self):
failures = []

for producer, consumer in itertools.product(
filter(lambda t: t.PRODUCER, self.testers),
filter(lambda t: t.CONSUMER, self.testers)):
for failure in self._compare_implementations(producer, consumer):
for failure in self._compare_implementations(
producer, consumer, self._produce_consume, self.json_files):
failures.append(failure)
if self.gold_dirs:
for gold_dir, consumer in itertools.product(
self.gold_dirs,
filter(lambda t: t.CONSUMER, self.testers)):
print('\n\n\n\n')
print('******************************************************')
print('Tests against golden files in {}'.format(gold_dir))
print('******************************************************')

def run_gold(producer, consumer, test_case):
self._run_gold(gold_dir, producer, consumer, test_case)
for failure in self._compare_implementations(
consumer, consumer, run_gold, self._gold_tests(gold_dir)):
failures.append(failure)

return failures

def run_flight(self):
Expand All @@ -1204,63 +1233,39 @@ def run_flight(self):
failures.append(failure)
return failures

def _compare_implementations(self, producer, consumer):
def _gold_tests(self, gold_dir):
prefix = os.path.basename(os.path.normpath(gold_dir))
SUFFIX = ".json.gz"
golds = [jf for jf in os.listdir(gold_dir) if jf.endswith(SUFFIX)]
for json_path in golds:
name = json_path[json_path.index('_')+1: -len(SUFFIX)]
base_name = prefix + "_" + name + ".gold.json"
out_path = os.path.join(self.temp_dir, base_name)
with gzip.open(os.path.join(gold_dir, json_path)) as i:
with open(out_path, "wb") as out:
out.write(i.read())

try:
skip = next(f for f in self.json_files
if f.name == name).skip
except StopIteration:
skip = set()
yield JsonFile(name, None, None, skip=skip, path=out_path)

def _compare_implementations(
self, producer, consumer, run_binaries, test_cases):
print('##########################################################')
print(
'{0} producing, {1} consuming'.format(producer.name, consumer.name)
)
print('##########################################################')

for test_case in self.json_files:
for test_case in test_cases:
json_path = test_case.path
print('==========================================================')
print('Testing file {0}'.format(json_path))
print('==========================================================')

name = os.path.splitext(os.path.basename(json_path))[0]

file_id = guid()[:8]

if ('JS' in (producer.name, consumer.name) and
"map" in test_case.name):
print('TODO(ARROW-1279): Enable map tests ' +
' for JS once they are unbroken')
continue

if ('JS' in (producer.name, consumer.name) and
"interval" in test_case.name):
print('TODO(ARROW-5239): Enable interval tests ' +
' for JS once JS supports them')
continue

if ('Go' in (producer.name, consumer.name) and
"decimal" in test_case.name):
print('TODO(ARROW-3676): Enable decimal tests ' +
' for Go')
continue

if ('Go' in (producer.name, consumer.name) and
"map" in test_case.name):
print('TODO(ARROW-3679): Enable map tests ' +
' for Go')
continue

if ('Go' in (producer.name, consumer.name) and
"dictionary" in test_case.name):
print('TODO(ARROW-3039): Enable dictionary tests ' +
' for Go')
continue

# Make the random access file
producer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.json_as_file')
producer_stream_path = os.path.join(self.temp_dir, file_id + '_' +
name +
'.producer_file_as_stream')
consumer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name +
'.consumer_stream_as_file')

if producer.name in test_case.skip:
print('-- Skipping test because producer {0} does '
'not support'.format(producer.name))
Expand All @@ -1276,19 +1281,7 @@ def _compare_implementations(self, producer, consumer):
continue

try:
print('-- Creating binary inputs')
producer.json_to_file(json_path, producer_file_path)

# Validate the file
print('-- Validating file')
consumer.validate(json_path, producer_file_path)

print('-- Validating stream')
producer.file_to_stream(producer_file_path,
producer_stream_path)
consumer.stream_to_file(producer_stream_path,
consumer_file_path)
consumer.validate(json_path, consumer_file_path)
run_binaries(producer, consumer, test_case)
except Exception:
traceback.print_exc()
yield (test_case, producer, consumer, sys.exc_info())
Expand All @@ -1297,6 +1290,52 @@ def _compare_implementations(self, producer, consumer):
else:
continue

def _produce_consume(self, producer, consumer, test_case):
# Make the random access file
json_path = test_case.path
file_id = guid()[:8]
name = os.path.splitext(os.path.basename(json_path))[0]

producer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.json_as_file')
producer_stream_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.producer_file_as_stream')
consumer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.consumer_stream_as_file')

print('-- Creating binary inputs')
producer.json_to_file(json_path, producer_file_path)

# Validate the file
print('-- Validating file')
consumer.validate(json_path, producer_file_path)

print('-- Validating stream')
producer.file_to_stream(producer_file_path, producer_stream_path)
consumer.stream_to_file(producer_stream_path, consumer_file_path)
consumer.validate(json_path, consumer_file_path)

def _run_gold(self, gold_dir, producer, consumer, test_case):
json_path = test_case.path

# Validate the file
print('-- Validating file')
producer_file_path = os.path.join(
gold_dir, "generated_" + test_case.name + ".arrow_file")
consumer.validate(json_path, producer_file_path)

print('-- Validating stream')
consumer_stream_path = os.path.join(
gold_dir, "generated_" + test_case.name + ".stream")
file_id = guid()[:8]
name = os.path.splitext(os.path.basename(json_path))[0]

consumer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.consumer_stream_as_file')

consumer.stream_to_file(consumer_stream_path, consumer_file_path)
consumer.validate(json_path, consumer_file_path)

def _compare_flight_implementations(self, producer, consumer):
print('##########################################################')
print(
Expand Down Expand Up @@ -1741,6 +1780,9 @@ def write_js_test_json(directory):
action='store_true', default=False,
help='Stop on first error')

parser.add_argument('--gold_dirs', action='append',
help="gold integration test file paths")

args = parser.parse_args()
if args.generated_json_path:
try:
Expand Down
2 changes: 1 addition & 1 deletion testing
Submodule testing updated 29 files
+ data/arrow-ipc/crash-5e88bae6ac5250714e8c8bc73b9d67b949fadbb4
+ data/arrow-ipc/crash-bd7e00178af2d236fdf041fcc1fb30975bf8fbca
+ data/arrow-ipc/integration/0.14.1/generated_datetime.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_datetime.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_datetime.stream
+ data/arrow-ipc/integration/0.14.1/generated_decimal.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_decimal.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_decimal.stream
+ data/arrow-ipc/integration/0.14.1/generated_dictionary.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_dictionary.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_dictionary.stream
+ data/arrow-ipc/integration/0.14.1/generated_interval.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_interval.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_interval.stream
+ data/arrow-ipc/integration/0.14.1/generated_map.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_map.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_map.stream
+ data/arrow-ipc/integration/0.14.1/generated_nested.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_nested.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_nested.stream
+ data/arrow-ipc/integration/0.14.1/generated_primitive.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_primitive.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_primitive.stream
+ data/arrow-ipc/integration/0.14.1/generated_primitive_no_batches.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_primitive_no_batches.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_primitive_no_batches.stream
+ data/arrow-ipc/integration/0.14.1/generated_primitive_zerolength.arrow_file
+ data/arrow-ipc/integration/0.14.1/generated_primitive_zerolength.json.gz
+ data/arrow-ipc/integration/0.14.1/generated_primitive_zerolength.stream