Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 21 additions & 26 deletions cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,16 @@ using std::size_t;
namespace arrow {
namespace io {

#define CHECK_FAILURE(RETURN_VALUE, WHAT) \
do { \
if (RETURN_VALUE == -1) { \
std::stringstream ss; \
ss << "HDFS: " << WHAT << " failed"; \
return Status::IOError(ss.str()); \
} \
#define CHECK_FAILURE(RETURN_VALUE, WHAT) \
do { \
if (RETURN_VALUE == -1) { \
std::stringstream ss; \
ss << "HDFS " << WHAT << " failed, errno: " << errno << " (" << strerror(errno) \
<< ")"; \
return Status::IOError(ss.str()); \
} \
} while (0)

static Status CheckReadResult(int ret) {
// Check for error on -1 (possibly errno set)

// ret == 0 at end of file, which is OK
if (ret == -1) {
// EOF
std::stringstream ss;
ss << "HDFS read failed, errno: " << errno;
return Status::IOError(ss.str());
}
return Status::OK();
}

static constexpr int kDefaultHdfsBufferSize = 1 << 16;

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -129,7 +117,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, buffer);
}
RETURN_NOT_OK(CheckReadResult(ret));
CHECK_FAILURE(ret, "read");
*bytes_read = ret;
return Status::OK();
}
Expand All @@ -156,7 +144,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
tSize ret = driver_->Read(
fs_, file_, reinterpret_cast<uint8_t*>(buffer) + total_bytes,
static_cast<tSize>(std::min<int64_t>(buffer_size_, nbytes - total_bytes)));
RETURN_NOT_OK(CheckReadResult(ret));
CHECK_FAILURE(ret, "read");
total_bytes += ret;
if (ret == 0) {
break;
Expand Down Expand Up @@ -428,22 +416,29 @@ class HadoopFileSystem::HadoopFileSystemImpl {

Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) {
int num_entries = 0;
errno = 0;
hdfsFileInfo* entries = driver_->ListDirectory(fs_, path.c_str(), &num_entries);

if (entries == nullptr) {
// If the directory is empty, entries is NULL but errno is 0. Non-zero
// errno indicates error
//
// Note: errno is thread-locala
if (errno == 0) {
// Note: errno is thread-local
//
// XXX(wesm): ARROW-2300; we found with Hadoop 2.6 that libhdfs would set
// errno 2/ENOENT for empty directories. To be more robust to this we
// double check this case
if ((errno == 0) || (errno == ENOENT && Exists(path))) {
num_entries = 0;
} else {
return Status::IOError("HDFS: list directory failed");
std::stringstream ss;
ss << "HDFS list directory failed, errno: " << errno << " (" << strerror(errno)
<< ")";
return Status::IOError(ss.str());
}
}

// Allocate additional space for elements

int vec_offset = static_cast<int>(listing->size());
listing->resize(vec_offset + num_entries);

Expand Down
9 changes: 8 additions & 1 deletion dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,11 @@ For JavaScript-specific releases, use a different verification script:

```shell
bash dev/release/js-verify-release-candidate.sh 0.7.0 0
```
```
# Integration testing

## HDFS C++ / Python support

```shell
run_docker_compose.sh hdfs_integration
```
58 changes: 47 additions & 11 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,61 @@

version: '3'
services:
gen_apidocs:

hdfs-namenode:
image: gelog/hadoop
shm_size: 2G
ports:
- "9000:9000"
- "50070:50070"
command: hdfs namenode
hostname: hdfs-namenode

hdfs-datanode:
image: gelog/hadoop
command: hdfs datanode
ports:
# The host port is randomly assigned by Docker, to allow scaling
# to multiple DataNodes on the same host
- "50075"
links:
- hdfs-namenode:hdfs-namenode

hdfs_integration:
links:
- hdfs-namenode:hdfs-namenode
- hdfs-datanode:hdfs-datanode
environment:
- ARROW_HDFS_TEST_HOST=hdfs-namenode
- ARROW_HDFS_TEST_PORT=9000
- ARROW_HDFS_TEST_USER=root
build:
context: gen_apidocs
context: hdfs_integration
volumes:
- ../..:/apache-arrow
run_site:

spark_integration:
build:
context: run_site
ports:
- "4000:4000"
context: spark_integration
volumes:
- ../..:/apache-arrow
- ../..:/apache-arrow

dask_integration:
build:
context: dask_integration
volumes:
- ../..:/apache-arrow
spark_integration:
build:
context: spark_integration
- ../..:/apache-arrow

gen_apidocs:
build:
context: gen_apidocs
volumes:
- ../..:/apache-arrow

run_site:
build:
context: run_site
ports:
- "4000:4000"
volumes:
- ../..:/apache-arrow
75 changes: 75 additions & 0 deletions dev/hdfs_integration/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM gelog/hadoop

ENV CC=gcc \
CXX=g++ \
PATH=/opt/conda/bin:$PATH

RUN apt-get update -y \
&& apt-get install -y \
gcc \
g++ \
git \
wget \
pkg-config \
ninja-build

# Miniconda - Python 3.6, 64-bit, x86, latest
RUN wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh -O conda.sh \
&& /bin/bash conda.sh -b -p /opt/conda \
&& rm conda.sh

# create conda env with the required dependences
RUN conda create -y -q -c conda-forge -n pyarrow-dev \
python=3.6 \
nomkl \
numpy \
six \
setuptools \
cython \
pandas \
pytest \
cmake \
flatbuffers \
rapidjson \
boost-cpp \
thrift-cpp \
snappy \
zlib \
gflags \
brotli \
jemalloc \
lz4-c \
zstd \
&& conda clean --all

# installing in the previous step boost=1.60 and boost-cpp=1.67 gets installed,
# cmake finds 1.60 and parquet fails to compile
# installing it in a separate step, boost=1.60 and boost-cpp=1.64 gets
# installed, cmake finds 1.64
# libhdfs3 needs to be pinned,see ARROW-1465 and ARROW-1445
RUN conda install -y -q -n pyarrow-dev -c conda-forge \
hdfs3 \
libhdfs3=2.2.31 \
&& conda clean --all

ADD . /apache-arrow
WORKDIR /apache-arrow

CMD arrow/dev/hdfs_integration/hdfs_integration.sh
101 changes: 101 additions & 0 deletions dev/hdfs_integration/hdfs_integration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Exit on any error
set -e

# cwd is mounted from host machine to
# and contains both arrow and parquet-cpp

# Activate conda environment
source activate pyarrow-dev

# Arrow build variables
export ARROW_BUILD_TYPE=debug
export ARROW_BUILD_TOOLCHAIN=$CONDA_PREFIX
export PARQUET_BUILD_TOOLCHAIN=$CONDA_PREFIX
export ARROW_HOME=$CONDA_PREFIX
export PARQUET_HOME=$CONDA_PREFIX

# Hadoop variables
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/
export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`

# For newer GCC per https://arrow.apache.org/docs/python/development.html#known-issues
export CXXFLAGS="-D_GLIBCXX_USE_CXX11_ABI=0"
export PYARROW_CXXFLAGS=$CXXFLAGS
export PYARROW_CMAKE_GENERATOR=Ninja

# Install arrow-cpp
mkdir -p arrow/cpp/hdfs-integration-build
pushd arrow/cpp/hdfs-integration-build

cmake -GNinja \
-DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
-DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
-DARROW_PYTHON=ON \
-DARROW_PLASMA=ON \
-DARROW_HDFS=ON \
-DARROW_BUILD_TESTS=ON \
-DCMAKE_CXX_FLAGS=$CXXFLAGS \
..
ninja
ninja install

popd

# Install parquet-cpp
mkdir -p parquet-cpp/hdfs-integration-build
pushd parquet-cpp/hdfs-integration-build

cmake -GNinja \
-DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \
-DCMAKE_INSTALL_PREFIX=$PARQUET_HOME \
-DPARQUET_BUILD_BENCHMARKS=OFF \
-DPARQUET_BUILD_EXECUTABLES=OFF \
-DPARQUET_BUILD_TESTS=ON \
-DCMAKE_CXX_FLAGS=$CXXFLAGS \
..
ninja
ninja install

popd

# Install pyarrow
pushd arrow/python

# Clear the build directory so we are guaranteed a fresh set of extensions
rm -rf build/

python setup.py build_ext \
--build-type=$ARROW_BUILD_TYPE \
--with-parquet \
--with-plasma \
--inplace

popd

# Run tests
export LIBHDFS3_CONF=arrow/dev/hdfs_integration/libhdfs3-client-config.xml

# Python
python -m pytest -vv -r sxX -s arrow/python/pyarrow \
--only-parquet --only-hdfs

# C++
arrow/cpp/hdfs-integration-build/debug/io-hdfs-test
3 changes: 1 addition & 2 deletions dev/spark_integration/spark_integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,5 @@ build/mvn -Dtest=none -DwildcardSuites="$SPARK_SCALA_TESTS" test
# Run pyarrow related Python tests only
SPARK_PYTHON_TESTS="ArrowTests PandasUDFTests ScalarPandasUDFTests GroupedMapPandasUDFTests GroupedAggPandasUDFTests"
echo "Testing PySpark: $SPARK_PYTHON_TESTS"
SPARK_TESTING=1 bin/pyspark pyspark.sql.tests $SPARK_PYTHON_TESTS
SPARK_TESTING=1 bin/pyspark pyspark.sql.tests $SPARK_PYTHON_TESTS
popd

7 changes: 3 additions & 4 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
common_metadata_path = None
metadata_path = None

if len(path_or_paths) == 1:
if isinstance(path_or_paths, list) and len(path_or_paths) == 1:
# Dask passes a directory as a list of length 1
path_or_paths = path_or_paths[0]

Expand Down Expand Up @@ -1004,9 +1004,8 @@ def read_table(source, columns=None, nthreads=1, metadata=None,
use_pandas_metadata=False):
if is_path(source):
fs = _get_fs_from_path(source)

if fs.isdir(source):
return fs.read_parquet(source, columns=columns, metadata=metadata)
return fs.read_parquet(source, columns=columns, metadata=metadata,
use_pandas_metadata=use_pandas_metadata)

pf = ParquetFile(source, metadata=metadata)
return pf.read(columns=columns, nthreads=nthreads,
Expand Down
6 changes: 0 additions & 6 deletions python/testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@

# Testing tools for odds and ends

## Testing HDFS file interface

```shell
./test_hdfs.sh
```

## Testing Dask integration

Initial integration testing with Dask has been Dockerized.
Expand Down
Loading