Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ set(ARROW_SRCS
io/interfaces.cc
io/memory.cc
io/slow.cc
io/stdio.cc
io/transform.cc
util/basic_decimal.cc
util/bit_block_counter.cc
Expand Down
95 changes: 95 additions & 0 deletions cpp/src/arrow/io/stdio.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/io/stdio.h"

#include <iostream>

#include "arrow/buffer.h"
#include "arrow/result.h"

namespace arrow {
namespace io {

//
// StdoutStream implementation
//

StdoutStream::StdoutStream() : pos_(0) { set_mode(FileMode::WRITE); }

Status StdoutStream::Close() { return Status::OK(); }

bool StdoutStream::closed() const { return false; }

Result<int64_t> StdoutStream::Tell() const { return pos_; }

Status StdoutStream::Write(const void* data, int64_t nbytes) {
pos_ += nbytes;
std::cout.write(reinterpret_cast<const char*>(data), nbytes);
return Status::OK();
}

//
// StderrStream implementation
//

StderrStream::StderrStream() : pos_(0) { set_mode(FileMode::WRITE); }

Status StderrStream::Close() { return Status::OK(); }

bool StderrStream::closed() const { return false; }

Result<int64_t> StderrStream::Tell() const { return pos_; }

Status StderrStream::Write(const void* data, int64_t nbytes) {
pos_ += nbytes;
std::cerr.write(reinterpret_cast<const char*>(data), nbytes);
return Status::OK();
}

//
// StdinStream implementation
//

StdinStream::StdinStream() : pos_(0) { set_mode(FileMode::READ); }

Status StdinStream::Close() { return Status::OK(); }

bool StdinStream::closed() const { return false; }

Result<int64_t> StdinStream::Tell() const { return pos_; }

Result<int64_t> StdinStream::Read(int64_t nbytes, void* out) {
std::cin.read(reinterpret_cast<char*>(out), nbytes);
if (std::cin) {
pos_ += nbytes;
return nbytes;
} else {
return 0;
}
}

Result<std::shared_ptr<Buffer>> StdinStream::Read(int64_t nbytes) {
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()));
ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false));
buffer->ZeroPadding();
return std::move(buffer);
}

} // namespace io
} // namespace arrow
82 changes: 82 additions & 0 deletions cpp/src/arrow/io/stdio.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>

#include "arrow/io/interfaces.h"
#include "arrow/util/visibility.h"

namespace arrow {
namespace io {

// Output stream that just writes to stdout.
class ARROW_EXPORT StdoutStream : public OutputStream {
public:
StdoutStream();
~StdoutStream() override {}

Status Close() override;
bool closed() const override;

Result<int64_t> Tell() const override;

Status Write(const void* data, int64_t nbytes) override;

private:
int64_t pos_;
};

// Output stream that just writes to stderr.
class ARROW_EXPORT StderrStream : public OutputStream {
public:
StderrStream();
~StderrStream() override {}

Status Close() override;
bool closed() const override;

Result<int64_t> Tell() const override;

Status Write(const void* data, int64_t nbytes) override;

private:
int64_t pos_;
};

// Input stream that just reads from stdin.
class ARROW_EXPORT StdinStream : public InputStream {
public:
StdinStream();
~StdinStream() override {}

Status Close() override;
bool closed() const override;

Result<int64_t> Tell() const override;

Result<int64_t> Read(int64_t nbytes, void* out) override;

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;

private:
int64_t pos_;
};

} // namespace io
} // namespace arrow
3 changes: 1 addition & 2 deletions cpp/src/arrow/ipc/file_to_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
#include <string>

#include "arrow/io/file.h"
#include "arrow/io/stdio.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/result.h"
#include "arrow/status.h"

#include "arrow/util/io_util.h"

namespace arrow {

class RecordBatch;
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/ipc/stream_to_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
#include <memory>
#include <string>

#include "arrow/io/stdio.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
#include "arrow/status.h"

#include "arrow/util/io_util.h"

namespace arrow {
namespace ipc {

Expand Down
37 changes: 10 additions & 27 deletions cpp/src/arrow/util/async_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
#include <cassert>
#include <cstring>
#include <deque>
#include <limits>
#include <queue>
#include <thread>

#include "arrow/util/functional.h"
#include "arrow/util/future.h"
#include "arrow/util/io_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
Expand Down Expand Up @@ -1247,6 +1248,8 @@ class BackgroundGenerator {
}

protected:
static constexpr uint64_t kUnlikelyThreadId{std::numeric_limits<uint64_t>::max()};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't see any reason this wouldn't work but why not just use the uint64_t version of std::thread::id()? Is it to allow this to be constexpr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I hadn't thought of that. But it seems simpler to use a hardcoded number like this.


struct State {
State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart)
: io_executor(io_executor),
Expand All @@ -1255,9 +1258,7 @@ class BackgroundGenerator {
it(std::move(it)),
reading(false),
finished(false),
should_shutdown(false) {
SetWorkerThreadId({}); // default-initialized thread id
}
should_shutdown(false) {}

void ClearQueue() {
while (!queue.empty()) {
Expand Down Expand Up @@ -1316,28 +1317,11 @@ class BackgroundGenerator {
return next;
}

void SetWorkerThreadId(const std::thread::id tid) {
uint64_t equiv{0};
// std::thread::id is trivially copyable as per C++ spec,
// so type punning as a uint64_t should work
static_assert(sizeof(std::thread::id) <= sizeof(uint64_t),
"std::thread::id can't fit into uint64_t");
memcpy(&equiv, reinterpret_cast<const void*>(&tid), sizeof(tid));
worker_thread_id.store(equiv);
}

std::thread::id GetWorkerThreadId() {
const auto equiv = worker_thread_id.load();
std::thread::id tid;
memcpy(reinterpret_cast<void*>(&tid), &equiv, sizeof(tid));
return tid;
}

internal::Executor* io_executor;
const int max_q;
const int q_restart;
Iterator<T> it;
std::atomic<uint64_t> worker_thread_id;
std::atomic<uint64_t> worker_thread_id{kUnlikelyThreadId};

// If true, the task is actively pumping items from the queue and does not need a
// restart
Expand All @@ -1346,8 +1330,7 @@ class BackgroundGenerator {
bool finished;
// Signal to the background task to end early because consumers have given up on it
bool should_shutdown;
// If the queue is empty then the consumer will create a waiting future and wait for
// it
// If the queue is empty, the consumer will create a waiting future and wait for it
std::queue<Result<T>> queue;
util::optional<Future<T>> waiting_future;
// Every background task is given a future to complete when it is entirely finished
Expand All @@ -1365,7 +1348,7 @@ class BackgroundGenerator {
///
/// It's a deadlock if we enter cleanup from
/// the worker thread but it can happen if the consumer doesn't transfer away
assert(state->GetWorkerThreadId() != std::this_thread::get_id());
assert(state->worker_thread_id.load() != ::arrow::internal::GetThreadId());
Future<> finish_fut;
{
auto lock = state->mutex.Lock();
Expand All @@ -1384,9 +1367,9 @@ class BackgroundGenerator {
};

static void WorkerTask(std::shared_ptr<State> state) {
state->worker_thread_id.store(::arrow::internal::GetThreadId());
// We need to capture the state to read while outside the mutex
bool reading = true;
state->SetWorkerThreadId(std::this_thread::get_id());
while (reading) {
auto next = state->it.Next();
// Need to capture state->waiting_future inside the mutex to mark finished outside
Expand Down Expand Up @@ -1438,7 +1421,7 @@ class BackgroundGenerator {
// reference it. We can safely transition to idle now.
task_finished = state->task_finished;
state->task_finished = Future<>();
state->SetWorkerThreadId({}); // default-initialized thread id
state->worker_thread_id.store(kUnlikelyThreadId);
}
task_finished.MarkFinished();
}
Expand Down
Loading