From 16c7f01f8fdb6c0b531d45d024e76d2d7d909ec5 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Wed, 1 Sep 2021 15:57:22 +0900 Subject: [PATCH 1/4] ARROW-13831: [GLib][Ruby] Add support for writing by Arrow Dataset --- .../arrow-dataset-glib/arrow-dataset-glib.h | 2 + .../arrow-dataset-glib/arrow-dataset-glib.hpp | 1 + c_glib/arrow-dataset-glib/dataset-factory.cpp | 68 +++ c_glib/arrow-dataset-glib/dataset.cpp | 403 +++++++++++++++- c_glib/arrow-dataset-glib/dataset.h | 25 + c_glib/arrow-dataset-glib/dataset.hpp | 10 +- c_glib/arrow-dataset-glib/enums.c.template | 52 +++ c_glib/arrow-dataset-glib/enums.h.template | 41 ++ c_glib/arrow-dataset-glib/file-format.cpp | 380 +++++++++++++-- c_glib/arrow-dataset-glib/file-format.h | 59 ++- c_glib/arrow-dataset-glib/file-format.hpp | 18 +- c_glib/arrow-dataset-glib/meson.build | 22 +- c_glib/arrow-dataset-glib/partitioning.cpp | 440 ++++++++++++++++++ c_glib/arrow-dataset-glib/partitioning.h | 110 +++++ c_glib/arrow-dataset-glib/partitioning.hpp | 31 ++ c_glib/arrow-dataset-glib/scanner.cpp | 19 + c_glib/arrow-dataset-glib/scanner.h | 4 + c_glib/arrow-glib/compute.cpp | 4 +- .../arrow-dataset-glib-docs.xml | 8 +- .../test/dataset/test-file-system-dataset.rb | 64 ++- c_glib/test/dataset/test-file-writer.rb | 65 +++ .../test/dataset/test-partitioning-options.rb | 46 ++ c_glib/test/dataset/test-partitioning.rb | 34 ++ c_glib/test/dataset/test-scanner.rb | 7 + c_glib/test/helper/buildable.rb | 7 + c_glib/test/helper/readable.rb | 47 ++ c_glib/test/run-test.rb | 2 + .../lib/arrow-dataset/arrow-table-savable.rb | 63 +++ .../lib/arrow-dataset/file-format.rb | 14 + .../lib/arrow-dataset/loader.rb | 1 + .../test/test-arrow-table.rb | 5 +- ruby/red-arrow/lib/arrow/file-system.rb | 34 ++ ruby/red-arrow/lib/arrow/loader.rb | 1 + ruby/red-arrow/lib/arrow/table-saver.rb | 32 +- 34 files changed, 2038 insertions(+), 81 deletions(-) create mode 100644 c_glib/arrow-dataset-glib/enums.c.template create mode 100644 c_glib/arrow-dataset-glib/enums.h.template create mode 100644 c_glib/arrow-dataset-glib/partitioning.cpp create mode 100644 c_glib/arrow-dataset-glib/partitioning.h create mode 100644 c_glib/arrow-dataset-glib/partitioning.hpp create mode 100644 c_glib/test/dataset/test-file-writer.rb create mode 100644 c_glib/test/dataset/test-partitioning-options.rb create mode 100644 c_glib/test/dataset/test-partitioning.rb create mode 100644 c_glib/test/helper/readable.rb create mode 100644 ruby/red-arrow-dataset/lib/arrow-dataset/arrow-table-savable.rb create mode 100644 ruby/red-arrow/lib/arrow/file-system.rb diff --git a/c_glib/arrow-dataset-glib/arrow-dataset-glib.h b/c_glib/arrow-dataset-glib/arrow-dataset-glib.h index 03e56516112..58f4e216cc7 100644 --- a/c_glib/arrow-dataset-glib/arrow-dataset-glib.h +++ b/c_glib/arrow-dataset-glib/arrow-dataset-glib.h @@ -23,6 +23,8 @@ #include #include +#include #include #include +#include #include diff --git a/c_glib/arrow-dataset-glib/arrow-dataset-glib.hpp b/c_glib/arrow-dataset-glib/arrow-dataset-glib.hpp index 65341b9b77e..8e996506884 100644 --- a/c_glib/arrow-dataset-glib/arrow-dataset-glib.hpp +++ b/c_glib/arrow-dataset-glib/arrow-dataset-glib.hpp @@ -25,4 +25,5 @@ #include #include #include +#include #include diff --git a/c_glib/arrow-dataset-glib/dataset-factory.cpp b/c_glib/arrow-dataset-glib/dataset-factory.cpp index 146db69adfc..433e58b2031 100644 --- a/c_glib/arrow-dataset-glib/dataset-factory.cpp +++ b/c_glib/arrow-dataset-glib/dataset-factory.cpp @@ -23,6 +23,7 @@ #include #include #include +#include G_BEGIN_DECLS @@ -142,6 +143,7 @@ gadataset_dataset_factory_finish(GADatasetDatasetFactory *factory, typedef struct GADatasetFileSystemDatasetFactoryPrivate_ { GADatasetFileFormat *format; GArrowFileSystem *file_system; + GADatasetPartitioning *partitioning; GList *files; arrow::dataset::FileSystemFactoryOptions options; } GADatasetFileSystemDatasetFactoryPrivate; @@ -149,6 +151,8 @@ typedef struct GADatasetFileSystemDatasetFactoryPrivate_ { enum { PROP_FORMAT = 1, PROP_FILE_SYSTEM, + PROP_PARTITIONING, + PROP_PARTITION_BASE_DIR, }; G_DEFINE_TYPE_WITH_PRIVATE(GADatasetFileSystemDatasetFactory, @@ -175,6 +179,11 @@ gadataset_file_system_dataset_factory_dispose(GObject *object) priv->file_system = NULL; } + if (priv->partitioning) { + g_object_unref(priv->partitioning); + priv->partitioning = NULL; + } + if (priv->files) { g_list_free_full(priv->files, g_object_unref); priv->files = NULL; @@ -205,6 +214,29 @@ gadataset_file_system_dataset_factory_set_property(GObject *object, case PROP_FORMAT: priv->format = GADATASET_FILE_FORMAT(g_value_dup_object(value)); break; + case PROP_PARTITIONING: + { + auto partitioning = g_value_get_object(value); + if (partitioning == priv->partitioning) { + break; + } + auto old_partitioning = priv->partitioning; + if (partitioning) { + g_object_ref(partitioning); + priv->partitioning = GADATASET_PARTITIONING(partitioning); + priv->options.partitioning = + gadataset_partitioning_get_raw(priv->partitioning); + } else { + priv->options.partitioning = arrow::dataset::Partitioning::Default(); + } + if (old_partitioning) { + g_object_unref(old_partitioning); + } + } + break; + case PROP_PARTITION_BASE_DIR: + priv->options.partition_base_dir = g_value_get_string(value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -226,6 +258,12 @@ gadataset_file_system_dataset_factory_get_property(GObject *object, case PROP_FILE_SYSTEM: g_value_set_object(value, priv->file_system); break; + case PROP_PARTITIONING: + g_value_set_object(value, priv->partitioning); + break; + case PROP_PARTITION_BASE_DIR: + g_value_set_string(value, priv->options.partition_base_dir.c_str()); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -279,6 +317,35 @@ gadataset_file_system_dataset_factory_class_init( GARROW_TYPE_FILE_SYSTEM, static_cast(G_PARAM_READABLE)); g_object_class_install_property(gobject_class, PROP_FILE_SYSTEM, spec); + + /** + * GADatasetFileSystemDatasetFactory:partitioning: + * + * Partitioning used by #GADatasetFileSystemDataset. + * + * Since: 6.0.0 + */ + spec = g_param_spec_object("partitioning", + "Partitioning", + "Partitioning used by GADatasetFileSystemDataset", + GADATASET_TYPE_PARTITIONING, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_PARTITIONING, spec); + + /** + * GADatasetFileSystemDatasetFactory:partition-base-dir: + * + * Partition base directory used by #GADatasetFileSystemDataset. + * + * Since: 6.0.0 + */ + spec = g_param_spec_string("partition-base-dir", + "Partition base directory", + "Partition base directory " + "used by GADatasetFileSystemDataset", + NULL, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_PARTITION_BASE_DIR, spec); } /** @@ -454,6 +521,7 @@ gadataset_file_system_dataset_factory_finish( "dataset", &arrow_dataset, "file-system", priv->file_system, "format", priv->format, + "partitioning", priv->partitioning, NULL)); } diff --git a/c_glib/arrow-dataset-glib/dataset.cpp b/c_glib/arrow-dataset-glib/dataset.cpp index 3bd62f99ef3..8613bedad42 100644 --- a/c_glib/arrow-dataset-glib/dataset.cpp +++ b/c_glib/arrow-dataset-glib/dataset.cpp @@ -18,11 +18,14 @@ */ #include +#include #include #include #include -#include +#include +#include +#include G_BEGIN_DECLS @@ -36,13 +39,8 @@ G_BEGIN_DECLS * * #GADatasetFileSystemDataset is a class for file system dataset. * - * #GADatasetFileFormat is a base class for file formats. - * - * #GADatasetCSVFileFormat is a class for CSV file format. - * - * #GADatasetIPCFileFormat is a class for IPC file format. - * - * #GADatasetParquetFileFormat is a class for Apache Parquet file format. + * #GADatasetFileSystemDatasetWriteOptions is a class for options to + * write a dataset to file system dataset. * * Since: 5.0.0 */ @@ -190,14 +188,326 @@ gadataset_dataset_get_type_name(GADatasetDataset *dataset) } +typedef struct GADatasetFileSystemDatasetWriteOptionsPrivate_ { + arrow::dataset::FileSystemDatasetWriteOptions options; + GADatasetFileWriteOptions *file_write_options; + GArrowFileSystem *file_system; + GADatasetPartitioning *partitioning; +} GADatasetFileSystemDatasetWriteOptionsPrivate; + +enum { + PROP_FILE_WRITE_OPTIONS = 1, + PROP_FILE_SYSTEM, + PROP_BASE_DIR, + PROP_PARTITIONING, + PROP_MAX_PARTITIONS, + PROP_BASE_NAME_TEMPLATE, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GADatasetFileSystemDatasetWriteOptions, + gadataset_file_system_dataset_write_options, + G_TYPE_OBJECT) + +#define GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS_GET_PRIVATE(obj) \ + static_cast( \ + gadataset_file_system_dataset_write_options_get_instance_private( \ + GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS(obj))) + +static void +gadataset_file_system_dataset_write_options_finalize(GObject *object) +{ + auto priv = GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS_GET_PRIVATE(object); + priv->options.~FileSystemDatasetWriteOptions(); + G_OBJECT_CLASS(gadataset_file_system_dataset_write_options_parent_class)-> + finalize(object); +} + +static void +gadataset_file_system_dataset_write_options_dispose(GObject *object) +{ + auto priv = GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS_GET_PRIVATE(object); + + if (priv->file_write_options) { + g_object_unref(priv->file_write_options); + priv->file_write_options = NULL; + } + + if (priv->file_system) { + g_object_unref(priv->file_system); + priv->file_system = NULL; + } + + if (priv->partitioning) { + g_object_unref(priv->partitioning); + priv->partitioning = NULL; + } + + G_OBJECT_CLASS(gadataset_file_system_dataset_write_options_parent_class)-> + dispose(object); +} + +static void +gadataset_file_system_dataset_write_options_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_FILE_WRITE_OPTIONS: + { + auto file_write_options = g_value_get_object(value); + if (file_write_options == priv->file_write_options) { + break; + } + auto old_file_write_options = priv->file_write_options; + if (file_write_options) { + g_object_ref(file_write_options); + priv->file_write_options = + GADATASET_FILE_WRITE_OPTIONS(file_write_options); + priv->options.file_write_options = + gadataset_file_write_options_get_raw(priv->file_write_options); + } else { + priv->options.file_write_options = nullptr; + } + if (old_file_write_options) { + g_object_unref(old_file_write_options); + } + } + break; + case PROP_FILE_SYSTEM: + { + auto file_system = g_value_get_object(value); + if (file_system == priv->file_system) { + break; + } + auto old_file_system = priv->file_system; + if (file_system) { + g_object_ref(file_system); + priv->file_system = GARROW_FILE_SYSTEM(file_system); + priv->options.filesystem = garrow_file_system_get_raw(priv->file_system); + } else { + priv->options.filesystem = nullptr; + } + if (old_file_system) { + g_object_unref(old_file_system); + } + } + break; + case PROP_BASE_DIR: + priv->options.base_dir = g_value_get_string(value); + break; + case PROP_PARTITIONING: + { + auto partitioning = g_value_get_object(value); + if (partitioning == priv->partitioning) { + break; + } + auto old_partitioning = priv->partitioning; + if (partitioning) { + g_object_ref(partitioning); + priv->partitioning = GADATASET_PARTITIONING(partitioning); + priv->options.partitioning = + gadataset_partitioning_get_raw(priv->partitioning); + } else { + priv->options.partitioning = arrow::dataset::Partitioning::Default(); + } + if (old_partitioning) { + g_object_unref(old_partitioning); + } + } + break; + case PROP_MAX_PARTITIONS: + priv->options.max_partitions = g_value_get_uint(value); + break; + case PROP_BASE_NAME_TEMPLATE: + priv->options.basename_template = g_value_get_string(value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gadataset_file_system_dataset_write_options_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + auto priv = GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_FILE_WRITE_OPTIONS: + g_value_set_object(value, priv->file_write_options); + break; + case PROP_FILE_SYSTEM: + g_value_set_object(value, priv->file_system); + break; + case PROP_BASE_DIR: + g_value_set_string(value, priv->options.base_dir.c_str()); + break; + case PROP_PARTITIONING: + g_value_set_object(value, priv->partitioning); + break; + case PROP_MAX_PARTITIONS: + g_value_set_uint(value, priv->options.max_partitions); + break; + case PROP_BASE_NAME_TEMPLATE: + g_value_set_string(value, priv->options.basename_template.c_str()); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gadataset_file_system_dataset_write_options_init( + GADatasetFileSystemDatasetWriteOptions *object) +{ + auto priv = GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS_GET_PRIVATE(object); + new(&(priv->options)) arrow::dataset::FileSystemDatasetWriteOptions; + priv->options.partitioning = arrow::dataset::Partitioning::Default(); +} + +static void +gadataset_file_system_dataset_write_options_class_init( + GADatasetFileSystemDatasetWriteOptionsClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + gobject_class->finalize = + gadataset_file_system_dataset_write_options_finalize; + gobject_class->dispose = + gadataset_file_system_dataset_write_options_dispose; + gobject_class->set_property = + gadataset_file_system_dataset_write_options_set_property; + gobject_class->get_property = + gadataset_file_system_dataset_write_options_get_property; + + arrow::dataset::FileSystemDatasetWriteOptions default_options; + GParamSpec *spec; + /** + * GADatasetFileSystemDatasetWriteOptions:file_write_options: + * + * Options for individual fragment writing. + * + * Since: 6.0.0 + */ + spec = g_param_spec_object("file-write-options", + "File write options", + "Options for individual fragment writing", + GADATASET_TYPE_FILE_WRITE_OPTIONS, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_FILE_WRITE_OPTIONS, spec); + + /** + * GADatasetFileSystemDatasetWriteOptions:file_system: + * + * #GArrowFileSystem into which a dataset will be written. + * + * Since: 6.0.0 + */ + spec = g_param_spec_object("file-system", + "File system", + "GArrowFileSystem into which " + "a dataset will be written", + GARROW_TYPE_FILE_SYSTEM, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_FILE_SYSTEM, spec); + + /** + * GADatasetFileSystemDatasetWriteOptions:base_dir: + * + * Root directory into which the dataset will be written. + * + * Since: 6.0.0 + */ + spec = g_param_spec_string("base-dir", + "Base directory", + "Root directory into which " + "the dataset will be written", + NULL, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_BASE_DIR, spec); + + /** + * GADatasetFileSystemDatasetWriteOptions:partitioning: + * + * #GADatasetPartitioning used to generate fragment paths. + * + * Since: 6.0.0 + */ + spec = g_param_spec_object("partitioning", + "Partitioning", + "GADatasetPartitioning used to " + "generate fragment paths", + GADATASET_TYPE_PARTITIONING, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_PARTITIONING, spec); + + /** + * GADatasetFileSystemDatasetWriteOptions:max-partitions: + * + * Maximum number of partitions any batch may be written into. + * + * Since: 6.0.0 + */ + spec = g_param_spec_uint("max-partitions", + "Max partitions", + "Maximum number of partitions " + "any batch may be written into", + 0, + G_MAXINT, + default_options.max_partitions, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_MAX_PARTITIONS, spec); + + /** + * GADatasetFileSystemDatasetWriteOptions:base-name-template: + * + * Template string used to generate fragment base names. {i} will be + * replaced by an auto incremented integer. + * + * Since: 6.0.0 + */ + spec = g_param_spec_string("base-name-template", + "Base name template", + "Template string used to generate fragment " + "base names. {i} will be replaced by " + "an auto incremented integer", + NULL, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_BASE_NAME_TEMPLATE, spec); +} + +/** + * gadataset_file_system_dataset_write_options_new: + * + * Returns: The newly created #GADatasetFileSystemDatasetWriteOptions. + * + * Since: 6.0.0 + */ +GADatasetFileSystemDatasetWriteOptions * +gadataset_file_system_dataset_write_options_new(void) +{ + return GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS( + g_object_new(GADATASET_TYPE_FILE_SYSTEM_DATASET_WRITE_OPTIONS, + NULL)); +} + + typedef struct GADatasetFileSystemDatasetPrivate_ { GADatasetFileFormat *format; GArrowFileSystem *file_system; + GADatasetPartitioning *partitioning; } GADatasetFileSystemDatasetPrivate; enum { - PROP_FORMAT = 1, - PROP_FILE_SYSTEM, + PROP_FILE_SYSTEM_DATASET_FORMAT = 1, + PROP_FILE_SYSTEM_DATASET_FILE_SYSTEM, + PROP_FILE_SYSTEM_DATASET_PARTITIONING, }; G_DEFINE_TYPE_WITH_PRIVATE(GADatasetFileSystemDataset, @@ -236,12 +546,15 @@ gadataset_file_system_dataset_set_property(GObject *object, auto priv = GADATASET_FILE_SYSTEM_DATASET_GET_PRIVATE(object); switch (prop_id) { - case PROP_FORMAT: + case PROP_FILE_SYSTEM_DATASET_FORMAT: priv->format = GADATASET_FILE_FORMAT(g_value_dup_object(value)); break; - case PROP_FILE_SYSTEM: + case PROP_FILE_SYSTEM_DATASET_FILE_SYSTEM: priv->file_system = GARROW_FILE_SYSTEM(g_value_dup_object(value)); break; + case PROP_FILE_SYSTEM_DATASET_PARTITIONING: + priv->partitioning = GADATASET_PARTITIONING(g_value_dup_object(value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -257,12 +570,15 @@ gadataset_file_system_dataset_get_property(GObject *object, auto priv = GADATASET_FILE_SYSTEM_DATASET_GET_PRIVATE(object); switch (prop_id) { - case PROP_FORMAT: + case PROP_FILE_SYSTEM_DATASET_FORMAT: g_value_set_object(value, priv->format); break; - case PROP_FILE_SYSTEM: + case PROP_FILE_SYSTEM_DATASET_FILE_SYSTEM: g_value_set_object(value, priv->file_system); break; + case PROP_FILE_SYSTEM_DATASET_PARTITIONING: + g_value_set_object(value, priv->partitioning); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -296,7 +612,9 @@ gadataset_file_system_dataset_class_init(GADatasetFileSystemDatasetClass *klass) GADATASET_TYPE_FILE_FORMAT, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); - g_object_class_install_property(gobject_class, PROP_FORMAT, spec); + g_object_class_install_property(gobject_class, + PROP_FILE_SYSTEM_DATASET_FORMAT, + spec); /** * GADatasetFileSystemDataset:file-system: @@ -311,7 +629,52 @@ gadataset_file_system_dataset_class_init(GADatasetFileSystemDatasetClass *klass) GARROW_TYPE_FILE_SYSTEM, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); - g_object_class_install_property(gobject_class, PROP_FILE_SYSTEM, spec); + g_object_class_install_property(gobject_class, + PROP_FILE_SYSTEM_DATASET_FILE_SYSTEM, + spec); + + /** + * GADatasetFileSystemDataset:partitioning: + * + * Partitioning of the dataset. + * + * Since: 6.0.0 + */ + spec = g_param_spec_object("partitioning", + "Partitioning", + "Partitioning of the dataset", + GADATASET_TYPE_PARTITIONING, + static_cast(G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, + PROP_FILE_SYSTEM_DATASET_PARTITIONING, + spec); +} + +/** + * gadataset_file_system_dataset_write_scanner: + * @scanner: A #GADatasetScanner that produces data to be written. + * @options: A #GADatasetFileSystemDatasetWriteOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 6.0.0 + */ +gboolean +gadataset_file_system_dataset_write_scanner( + GADatasetScanner *scanner, + GADatasetFileSystemDatasetWriteOptions *options, + GError **error) +{ + auto arrow_scanner = gadataset_scanner_get_raw(scanner); + auto arrow_options = + gadataset_file_system_dataset_write_options_get_raw(options); + auto status = + arrow::dataset::FileSystemDataset::Write(*arrow_options, arrow_scanner); + return garrow::check(error, + status, + "[file-system-dataset][write-scanner]"); } @@ -363,3 +726,11 @@ gadataset_dataset_get_raw(GADatasetDataset *dataset) auto priv = GADATASET_DATASET_GET_PRIVATE(dataset); return priv->dataset; } + +arrow::dataset::FileSystemDatasetWriteOptions * +gadataset_file_system_dataset_write_options_get_raw( + GADatasetFileSystemDatasetWriteOptions *options) +{ + auto priv = GADATASET_FILE_SYSTEM_DATASET_WRITE_OPTIONS_GET_PRIVATE(options); + return &(priv->options); +} diff --git a/c_glib/arrow-dataset-glib/dataset.h b/c_glib/arrow-dataset-glib/dataset.h index 97cf35d74d7..86d077caa98 100644 --- a/c_glib/arrow-dataset-glib/dataset.h +++ b/c_glib/arrow-dataset-glib/dataset.h @@ -24,6 +24,7 @@ G_BEGIN_DECLS typedef struct _GADatasetScannerBuilder GADatasetScannerBuilder; +typedef struct _GADatasetScanner GADatasetScanner; #define GADATASET_TYPE_DATASET (gadataset_dataset_get_type()) G_DECLARE_DERIVABLE_TYPE(GADatasetDataset, @@ -49,6 +50,23 @@ gchar * gadataset_dataset_get_type_name(GADatasetDataset *dataset); +#define GADATASET_TYPE_FILE_SYSTEM_DATASET_WRITE_OPTIONS \ + (gadataset_file_system_dataset_write_options_get_type()) +G_DECLARE_DERIVABLE_TYPE(GADatasetFileSystemDatasetWriteOptions, + gadataset_file_system_dataset_write_options, + GADATASET, + FILE_SYSTEM_DATASET_WRITE_OPTIONS, + GObject) +struct _GADatasetFileSystemDatasetWriteOptionsClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_6_0 +GADatasetFileSystemDatasetWriteOptions * +gadataset_file_system_dataset_write_options_new(void); + + #define GADATASET_TYPE_FILE_SYSTEM_DATASET \ (gadataset_file_system_dataset_get_type()) G_DECLARE_DERIVABLE_TYPE(GADatasetFileSystemDataset, @@ -61,5 +79,12 @@ struct _GADatasetFileSystemDatasetClass GADatasetDatasetClass parent_class; }; +GARROW_AVAILABLE_IN_6_0 +gboolean +gadataset_file_system_dataset_write_scanner( + GADatasetScanner *scanner, + GADatasetFileSystemDatasetWriteOptions *options, + GError **error); + G_END_DECLS diff --git a/c_glib/arrow-dataset-glib/dataset.hpp b/c_glib/arrow-dataset-glib/dataset.hpp index 94dddd2eb7a..1dab391e8af 100644 --- a/c_glib/arrow-dataset-glib/dataset.hpp +++ b/c_glib/arrow-dataset-glib/dataset.hpp @@ -23,6 +23,7 @@ #include + GADatasetDataset * gadataset_dataset_new_raw( std::shared_ptr *arrow_dataset); @@ -39,10 +40,7 @@ gadataset_dataset_new_raw_valist( std::shared_ptr gadataset_dataset_get_raw(GADatasetDataset *dataset); -GADatasetFileFormat * -gadataset_file_format_new_raw( - std::shared_ptr *arrow_format); -std::shared_ptr -gadataset_dataset_get_raw(GADatasetDataset *dataset); - +arrow::dataset::FileSystemDatasetWriteOptions * +gadataset_file_system_dataset_write_options_get_raw( + GADatasetFileSystemDatasetWriteOptions *options); diff --git a/c_glib/arrow-dataset-glib/enums.c.template b/c_glib/arrow-dataset-glib/enums.c.template new file mode 100644 index 00000000000..8921ab06252 --- /dev/null +++ b/c_glib/arrow-dataset-glib/enums.c.template @@ -0,0 +1,52 @@ +/*** BEGIN file-header ***/ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +/*** END file-header ***/ + +/*** BEGIN file-production ***/ + +/* enumerations from "@filename@" */ +/*** END file-production ***/ + +/*** BEGIN value-header ***/ +GType +@enum_name@_get_type(void) +{ + static GType etype = 0; + if (G_UNLIKELY(etype == 0)) { + static const G@Type@Value values[] = { +/*** END value-header ***/ + +/*** BEGIN value-production ***/ + {@VALUENAME@, "@VALUENAME@", "@valuenick@"}, +/*** END value-production ***/ + +/*** BEGIN value-tail ***/ + {0, NULL, NULL} + }; + etype = g_@type@_register_static(g_intern_static_string("@EnumName@"), values); + } + return etype; +} +/*** END value-tail ***/ + +/*** BEGIN file-tail ***/ +/*** END file-tail ***/ diff --git a/c_glib/arrow-dataset-glib/enums.h.template b/c_glib/arrow-dataset-glib/enums.h.template new file mode 100644 index 00000000000..d6a0a455f5a --- /dev/null +++ b/c_glib/arrow-dataset-glib/enums.h.template @@ -0,0 +1,41 @@ +/*** BEGIN file-header ***/ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +G_BEGIN_DECLS +/*** END file-header ***/ + +/*** BEGIN file-production ***/ + +/* enumerations from "@filename@" */ +/*** END file-production ***/ + +/*** BEGIN value-header ***/ +GType @enum_name@_get_type(void) G_GNUC_CONST; +#define @ENUMPREFIX@_TYPE_@ENUMSHORT@ (@enum_name@_get_type()) +/*** END value-header ***/ + +/*** BEGIN file-tail ***/ + +G_END_DECLS +/*** END file-tail ***/ diff --git a/c_glib/arrow-dataset-glib/file-format.cpp b/c_glib/arrow-dataset-glib/file-format.cpp index 43f6a198f23..c0c92d966f8 100644 --- a/c_glib/arrow-dataset-glib/file-format.cpp +++ b/c_glib/arrow-dataset-glib/file-format.cpp @@ -18,6 +18,11 @@ */ #include +#include +#include +#include +#include +#include #include @@ -29,6 +34,11 @@ G_BEGIN_DECLS * @title: File format classes * @include: arrow-dataset-glib/arrow-dataset-glib.h * + * #GADatasetFileWriteOptions is a class for options to write a file + * of this format. + * + * #GADatasetFileWriter is a class for writing a file of this format. + * * #GADatasetFileFormat is a base class for file format classes. * * #GADatasetCSVFileFormat is a class for CSV file format. @@ -40,12 +50,218 @@ G_BEGIN_DECLS * Since: 3.0.0 */ +typedef struct GADatasetFileWriteOptionsPrivate_ { + std::shared_ptr options; +} GADatasetFileWriteOptionsPrivate; + +enum { + PROP_OPTIONS = 1, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GADatasetFileWriteOptions, + gadataset_file_write_options, + G_TYPE_OBJECT) + +#define GADATASET_FILE_WRITE_OPTIONS_GET_PRIVATE(obj) \ + static_cast( \ + gadataset_file_write_options_get_instance_private( \ + GADATASET_FILE_WRITE_OPTIONS(obj))) + +static void +gadataset_file_write_options_finalize(GObject *object) +{ + auto priv = GADATASET_FILE_WRITE_OPTIONS_GET_PRIVATE(object); + priv->options.~shared_ptr(); + G_OBJECT_CLASS(gadataset_file_write_options_parent_class)->finalize(object); +} + +static void +gadataset_file_write_options_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GADATASET_FILE_WRITE_OPTIONS_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_OPTIONS: + priv->options = + *static_cast *>( + g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gadataset_file_write_options_init(GADatasetFileWriteOptions *object) +{ + auto priv = GADATASET_FILE_WRITE_OPTIONS_GET_PRIVATE(object); + new(&priv->options) std::shared_ptr; +} + +static void +gadataset_file_write_options_class_init(GADatasetFileWriteOptionsClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->finalize = gadataset_file_write_options_finalize; + gobject_class->set_property = gadataset_file_write_options_set_property; + + GParamSpec *spec; + spec = g_param_spec_pointer("options", + "Options", + "The raw " + "std::shared *", + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_OPTIONS, spec); +} + + +typedef struct GADatasetFileWriterPrivate_ { + std::shared_ptr writer; +} GADatasetFileWriterPrivate; + +enum { + PROP_WRITER = 1, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GADatasetFileWriter, + gadataset_file_writer, + G_TYPE_OBJECT) + +#define GADATASET_FILE_WRITER_GET_PRIVATE(obj) \ + static_cast( \ + gadataset_file_writer_get_instance_private( \ + GADATASET_FILE_WRITER(obj))) + +static void +gadataset_file_writer_finalize(GObject *object) +{ + auto priv = GADATASET_FILE_WRITER_GET_PRIVATE(object); + priv->writer.~shared_ptr(); + G_OBJECT_CLASS(gadataset_file_writer_parent_class)->finalize(object); +} + +static void +gadataset_file_writer_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GADATASET_FILE_WRITER_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_WRITER: + priv->writer = + *static_cast *>( + g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gadataset_file_writer_init(GADatasetFileWriter *object) +{ + auto priv = GADATASET_FILE_WRITER_GET_PRIVATE(object); + new(&(priv->writer)) std::shared_ptr; +} + +static void +gadataset_file_writer_class_init(GADatasetFileWriterClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->finalize = gadataset_file_writer_finalize; + gobject_class->set_property = gadataset_file_writer_set_property; + + GParamSpec *spec; + spec = g_param_spec_pointer("writer", + "Writer", + "The raw " + "std::shared *", + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_WRITER, spec); +} + +/** + * gadataset_file_writer_write_record_batch: + * @writer: A #GADatasetFileWriter. + * @record_batch: A #GArrowRecordBatch to be written. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 6.0.0 + */ +gboolean +gadataset_file_writer_write_record_batch(GADatasetFileWriter *writer, + GArrowRecordBatch *record_batch, + GError **error) +{ + const auto arrow_writer = gadataset_file_writer_get_raw(writer); + const auto arrow_record_batch = garrow_record_batch_get_raw(record_batch); + auto status = arrow_writer->Write(arrow_record_batch); + return garrow::check(error, status, "[file-writer][write-record-batch]"); +} + +/** + * gadataset_file_writer_write_record_batch_reader: + * @writer: A #GADatasetFileWriter. + * @reader: A #GArrowRecordBatchReader to be written. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 6.0.0 + */ +gboolean +gadataset_file_writer_write_record_batch_reader(GADatasetFileWriter *writer, + GArrowRecordBatchReader *reader, + GError **error) +{ + const auto arrow_writer = gadataset_file_writer_get_raw(writer); + auto arrow_reader = garrow_record_batch_reader_get_raw(reader); + auto status = arrow_writer->Write(arrow_reader.get()); + return garrow::check(error, + status, + "[file-writer][write-record-batch-reader]"); +} + +/** + * gadataset_file_writer_finish: + * @writer: A #GADatasetFileWriter. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 6.0.0 + */ +gboolean +gadataset_file_writer_finish(GADatasetFileWriter *writer, + GError **error) +{ + const auto arrow_writer = gadataset_file_writer_get_raw(writer); + auto status = arrow_writer->Finish(); + return garrow::check(error, + status, + "[file-writer][finish]"); +} + + typedef struct GADatasetFileFormatPrivate_ { - std::shared_ptr file_format; + std::shared_ptr format; } GADatasetFileFormatPrivate; enum { - PROP_FILE_FORMAT = 1, + PROP_FORMAT = 1, }; G_DEFINE_TYPE_WITH_PRIVATE(GADatasetFileFormat, @@ -61,9 +277,7 @@ static void gadataset_file_format_finalize(GObject *object) { auto priv = GADATASET_FILE_FORMAT_GET_PRIVATE(object); - - priv->file_format.~shared_ptr(); - + priv->format.~shared_ptr(); G_OBJECT_CLASS(gadataset_file_format_parent_class)->finalize(object); } @@ -76,8 +290,8 @@ gadataset_file_format_set_property(GObject *object, auto priv = GADATASET_FILE_FORMAT_GET_PRIVATE(object); switch (prop_id) { - case PROP_FILE_FORMAT: - priv->file_format = + case PROP_FORMAT: + priv->format = *static_cast *>( g_value_get_pointer(value)); break; @@ -91,7 +305,7 @@ static void gadataset_file_format_init(GADatasetFileFormat *object) { auto priv = GADATASET_FILE_FORMAT_GET_PRIVATE(object); - new(&priv->file_format) std::shared_ptr; + new(&priv->format) std::shared_ptr; } static void @@ -103,49 +317,106 @@ gadataset_file_format_class_init(GADatasetFileFormatClass *klass) gobject_class->set_property = gadataset_file_format_set_property; GParamSpec *spec; - spec = g_param_spec_pointer("file-format", - "FileFormat", + spec = g_param_spec_pointer("format", + "Format", "The raw std::shared *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); - g_object_class_install_property(gobject_class, PROP_FILE_FORMAT, spec); + g_object_class_install_property(gobject_class, PROP_FORMAT, spec); } /** * gadataset_file_format_get_type_name: - * @file_format: A #GADatasetFileFormat. + * @format: A #GADatasetFileFormat. * - * Returns: The type name of @file_format. + * Returns: The type name of @format. * * It should be freed with g_free() when no longer needed. * * Since: 3.0.0 */ gchar * -gadataset_file_format_get_type_name(GADatasetFileFormat *file_format) +gadataset_file_format_get_type_name(GADatasetFileFormat *format) { - const auto arrow_file_format = gadataset_file_format_get_raw(file_format); - const auto &type_name = arrow_file_format->type_name(); + const auto arrow_format = gadataset_file_format_get_raw(format); + const auto &type_name = arrow_format->type_name(); return g_strndup(type_name.data(), type_name.size()); } +/** + * gadataset_file_format_get_default_write_options: + * @format: A #GADatasetFileFormat. + * + * Returns: (transfer full): The default #GADatasetFileWriteOptions of @format. + * + * Since: 6.0.0 + */ +GADatasetFileWriteOptions * +gadataset_file_format_get_default_write_options(GADatasetFileFormat *format) +{ + const auto arrow_format = gadataset_file_format_get_raw(format); + auto arrow_options = arrow_format->DefaultWriteOptions(); + return gadataset_file_write_options_new_raw(&arrow_options); +} + +/** + * gadataset_file_format_open_writer: + * @format: A #GADatasetFileFormat. + * @destination: A #GArrowOutputStream. + * @file_system: The #GArrowFileSystem of @destination. + * @path: The path of @destination. + * @schema: A #GArrowSchema that is used by written record batches. + * @options: A #GADatasetFileWriteOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (transfer full): The newly created #GADatasetFileWriter of @format + * on success, %NULL on error. + * + * Since: 6.0.0 + */ +GADatasetFileWriter * +gadataset_file_format_open_writer(GADatasetFileFormat *format, + GArrowOutputStream *destination, + GArrowFileSystem *file_system, + const gchar *path, + GArrowSchema *schema, + GADatasetFileWriteOptions *options, + GError **error) +{ + const auto arrow_format = gadataset_file_format_get_raw(format); + auto arrow_destination = garrow_output_stream_get_raw(destination); + auto arrow_file_system = garrow_file_system_get_raw(file_system); + auto arrow_schema = garrow_schema_get_raw(schema); + auto arrow_options = gadataset_file_write_options_get_raw(options); + auto arrow_writer_result = + arrow_format->MakeWriter(arrow_destination, + arrow_schema, + arrow_options, + {arrow_file_system, path}); + if (garrow::check(error, arrow_writer_result, "[file-format][open-writer]")) { + auto arrow_writer = *arrow_writer_result; + return gadataset_file_writer_new_raw(&arrow_writer); + } else { + return NULL; + } +} + /** * gadataset_file_format_equal: - * @file_format: A #GADatasetFileFormat. - * @other_file_format: A #GADatasetFileFormat to be compared. + * @format: A #GADatasetFileFormat. + * @other_format: A #GADatasetFileFormat to be compared. * * Returns: %TRUE if they are the same content file format, %FALSE otherwise. * * Since: 3.0.0 */ gboolean -gadataset_file_format_equal(GADatasetFileFormat *file_format, - GADatasetFileFormat *other_file_format) +gadataset_file_format_equal(GADatasetFileFormat *format, + GADatasetFileFormat *other_format) { - const auto arrow_file_format = gadataset_file_format_get_raw(file_format); - const auto arrow_other_file_format = - gadataset_file_format_get_raw(other_file_format); - return arrow_file_format->Equals(*arrow_other_file_format); + const auto arrow_format = gadataset_file_format_get_raw(format); + const auto arrow_other_format = gadataset_file_format_get_raw(other_format); + return arrow_format->Equals(*arrow_other_format); } @@ -173,10 +444,9 @@ gadataset_csv_file_format_class_init(GADatasetCSVFileFormatClass *klass) GADatasetCSVFileFormat * gadataset_csv_file_format_new(void) { - std::shared_ptr arrow_file_format = + std::shared_ptr arrow_format = std::make_shared(); - return GADATASET_CSV_FILE_FORMAT( - gadataset_file_format_new_raw(&arrow_file_format)); + return GADATASET_CSV_FILE_FORMAT(gadataset_file_format_new_raw(&arrow_format)); } @@ -204,10 +474,9 @@ gadataset_ipc_file_format_class_init(GADatasetIPCFileFormatClass *klass) GADatasetIPCFileFormat * gadataset_ipc_file_format_new(void) { - std::shared_ptr arrow_file_format = + std::shared_ptr arrow_format = std::make_shared(); - return GADATASET_IPC_FILE_FORMAT( - gadataset_file_format_new_raw(&arrow_file_format)); + return GADATASET_IPC_FILE_FORMAT(gadataset_file_format_new_raw(&arrow_format)); } @@ -235,21 +504,56 @@ gadataset_parquet_file_format_class_init(GADatasetParquetFileFormatClass *klass) GADatasetParquetFileFormat * gadataset_parquet_file_format_new(void) { - std::shared_ptr arrow_file_format = + std::shared_ptr arrow_format = std::make_shared(); return GADATASET_PARQUET_FILE_FORMAT( - gadataset_file_format_new_raw(&arrow_file_format)); + gadataset_file_format_new_raw(&arrow_format)); } G_END_DECLS +GADatasetFileWriteOptions * +gadataset_file_write_options_new_raw( + std::shared_ptr *arrow_options) +{ + return GADATASET_FILE_WRITE_OPTIONS( + g_object_new(GADATASET_TYPE_FILE_WRITE_OPTIONS, + "options", arrow_options, + NULL)); +} + +std::shared_ptr +gadataset_file_write_options_get_raw(GADatasetFileWriteOptions *options) +{ + auto priv = GADATASET_FILE_WRITE_OPTIONS_GET_PRIVATE(options); + return priv->options; +} + + +GADatasetFileWriter * +gadataset_file_writer_new_raw( + std::shared_ptr *arrow_writer) +{ + return GADATASET_FILE_WRITER(g_object_new(GADATASET_TYPE_FILE_WRITER, + "writer", arrow_writer, + NULL)); +} + +std::shared_ptr +gadataset_file_writer_get_raw(GADatasetFileWriter *writer) +{ + auto priv = GADATASET_FILE_WRITER_GET_PRIVATE(writer); + return priv->writer; +} + + GADatasetFileFormat * gadataset_file_format_new_raw( - std::shared_ptr *arrow_file_format) + std::shared_ptr *arrow_format) { GType type = GADATASET_TYPE_FILE_FORMAT; - const auto &type_name = (*arrow_file_format)->type_name(); + const auto &type_name = (*arrow_format)->type_name(); if (type_name == "csv") { type = GADATASET_TYPE_CSV_FILE_FORMAT; } else if (type_name == "ipc") { @@ -258,13 +562,13 @@ gadataset_file_format_new_raw( type = GADATASET_TYPE_PARQUET_FILE_FORMAT; } return GADATASET_FILE_FORMAT(g_object_new(type, - "file-format", arrow_file_format, + "format", arrow_format, NULL)); } std::shared_ptr -gadataset_file_format_get_raw(GADatasetFileFormat *file_format) +gadataset_file_format_get_raw(GADatasetFileFormat *format) { - auto priv = GADATASET_FILE_FORMAT_GET_PRIVATE(file_format); - return priv->file_format; + auto priv = GADATASET_FILE_FORMAT_GET_PRIVATE(format); + return priv->format; } diff --git a/c_glib/arrow-dataset-glib/file-format.h b/c_glib/arrow-dataset-glib/file-format.h index 7a6f46f56e9..16a8340747c 100644 --- a/c_glib/arrow-dataset-glib/file-format.h +++ b/c_glib/arrow-dataset-glib/file-format.h @@ -23,6 +23,47 @@ G_BEGIN_DECLS +#define GADATASET_TYPE_FILE_WRITE_OPTIONS \ + (gadataset_file_write_options_get_type()) +G_DECLARE_DERIVABLE_TYPE(GADatasetFileWriteOptions, + gadataset_file_write_options, + GADATASET, + FILE_WRITE_OPTIONS, + GObject) +struct _GADatasetFileWriteOptionsClass +{ + GObjectClass parent_class; +}; + + +#define GADATASET_TYPE_FILE_WRITER \ + (gadataset_file_writer_get_type()) +G_DECLARE_DERIVABLE_TYPE(GADatasetFileWriter, + gadataset_file_writer, + GADATASET, + FILE_WRITER, + GObject) +struct _GADatasetFileWriterClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_6_0 +gboolean +gadataset_file_writer_write_record_batch(GADatasetFileWriter *writer, + GArrowRecordBatch *record_batch, + GError **error); +GARROW_AVAILABLE_IN_6_0 +gboolean +gadataset_file_writer_write_record_batch_reader(GADatasetFileWriter *writer, + GArrowRecordBatchReader *reader, + GError **error); +GARROW_AVAILABLE_IN_6_0 +gboolean +gadataset_file_writer_finish(GADatasetFileWriter *writer, + GError **error); + + #define GADATASET_TYPE_FILE_FORMAT (gadataset_file_format_get_type()) G_DECLARE_DERIVABLE_TYPE(GADatasetFileFormat, gadataset_file_format, @@ -36,12 +77,24 @@ struct _GADatasetFileFormatClass GARROW_AVAILABLE_IN_3_0 gchar * -gadataset_file_format_get_type_name(GADatasetFileFormat *file_format); +gadataset_file_format_get_type_name(GADatasetFileFormat *format); +GARROW_AVAILABLE_IN_6_0 +GADatasetFileWriteOptions * +gadataset_file_format_get_default_write_options(GADatasetFileFormat *format); +GARROW_AVAILABLE_IN_6_0 +GADatasetFileWriter * +gadataset_file_format_open_writer(GADatasetFileFormat *format, + GArrowOutputStream *destination, + GArrowFileSystem *file_system, + const gchar *path, + GArrowSchema *schema, + GADatasetFileWriteOptions *options, + GError **error); GARROW_AVAILABLE_IN_3_0 gboolean -gadataset_file_format_equal(GADatasetFileFormat *file_format, - GADatasetFileFormat *other_file_format); +gadataset_file_format_equal(GADatasetFileFormat *format, + GADatasetFileFormat *other_format); #define GADATASET_TYPE_CSV_FILE_FORMAT (gadataset_csv_file_format_get_type()) diff --git a/c_glib/arrow-dataset-glib/file-format.hpp b/c_glib/arrow-dataset-glib/file-format.hpp index 5dfb20b3caa..636dc5c015b 100644 --- a/c_glib/arrow-dataset-glib/file-format.hpp +++ b/c_glib/arrow-dataset-glib/file-format.hpp @@ -23,8 +23,22 @@ #include +GADatasetFileWriteOptions * +gadataset_file_write_options_new_raw( + std::shared_ptr *arrow_options); +std::shared_ptr +gadataset_file_write_options_get_raw(GADatasetFileWriteOptions *options); + + +GADatasetFileWriter * +gadataset_file_writer_new_raw( + std::shared_ptr *arrow_writer); +std::shared_ptr +gadataset_file_writer_get_raw(GADatasetFileWriter *writer); + + GADatasetFileFormat * gadataset_file_format_new_raw( - std::shared_ptr *arrow_file_format); + std::shared_ptr *arrow_format); std::shared_ptr -gadataset_file_format_get_raw(GADatasetFileFormat *file_format); +gadataset_file_format_get_raw(GADatasetFileFormat *format); diff --git a/c_glib/arrow-dataset-glib/meson.build b/c_glib/arrow-dataset-glib/meson.build index b3f617330cf..0d9b8564ecb 100644 --- a/c_glib/arrow-dataset-glib/meson.build +++ b/c_glib/arrow-dataset-glib/meson.build @@ -22,6 +22,7 @@ sources = files( 'dataset.cpp', 'file-format.cpp', 'fragment.cpp', + 'partitioning.cpp', 'scanner.cpp', ) @@ -31,6 +32,7 @@ c_headers = files( 'dataset.h', 'file-format.h', 'fragment.h', + 'partitioning.h', 'scanner.h', ) @@ -40,9 +42,22 @@ cpp_headers = files( 'dataset.hpp', 'file-format.hpp', 'fragment.hpp', + 'partitioning.hpp', 'scanner.hpp', ) +enums = gnome.mkenums('enums', + sources: c_headers, + identifier_prefix: 'GADataset', + symbol_prefix: 'gadataset', + c_template: 'enums.c.template', + h_template: 'enums.h.template', + install_dir: join_paths(include_dir, meson.project_name()), + install_header: true) +enums_source = enums[0] +enums_header = enums[1] + + headers = c_headers + cpp_headers install_headers(headers, subdir: 'arrow-dataset-glib') @@ -51,7 +66,7 @@ dependencies = [ arrow_glib, ] libarrow_dataset_glib = library('arrow-dataset-glib', - sources: sources, + sources: sources + enums, install: true, dependencies: dependencies, include_directories: base_include_directories, @@ -59,7 +74,8 @@ libarrow_dataset_glib = library('arrow-dataset-glib', version: library_version) arrow_dataset_glib = declare_dependency(link_with: libarrow_dataset_glib, include_directories: base_include_directories, - dependencies: dependencies) + dependencies: dependencies, + sources: enums_header) pkgconfig.generate(libarrow_dataset_glib, filebase: 'arrow-dataset-glib', @@ -71,7 +87,7 @@ pkgconfig.generate(libarrow_dataset_glib, if have_gi gnome.generate_gir(libarrow_dataset_glib, dependencies: declare_dependency(sources: arrow_glib_gir), - sources: sources + c_headers, + sources: sources + c_headers + enums, namespace: 'ArrowDataset', nsversion: api_version, identifier_prefix: 'GADataset', diff --git a/c_glib/arrow-dataset-glib/partitioning.cpp b/c_glib/arrow-dataset-glib/partitioning.cpp new file mode 100644 index 00000000000..bce33671a35 --- /dev/null +++ b/c_glib/arrow-dataset-glib/partitioning.cpp @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include +#include + +G_BEGIN_DECLS + +/** + * SECTION: partitioning + * @section_id: partitioning + * @title: Partitioning classes + * @include: arrow-dataset-glib/arrow-dataset-glib.h + * + * #GADatasetPartitioningOptions is a class for partitioning options. + * + * #GADatasetPartitioning is a base class for partitioning classes + * such as #GADatasetDirectoryPartitioning. + * + * #GADatasetKeyValuePartitioning is a base class for key-value style + * partitioning classes such as #GADatasetDirectoryPartitioning. + * + * #GADatasetDirectoryPartitioning is a class for partitioning that + * uses directory structure. + * + * Since: 6.0.0 + */ + +typedef struct GADatasetPartitioningOptionsPrivate_ { + gboolean infer_dictionary; + GArrowSchema *schema; + GADatasetSegmentEncoding segment_encoding; +} GADatasetPartitioningOptionsPrivate; + +enum { + PROP_INFER_DICTIONARY = 1, + PROP_SCHEMA, + PROP_SEGMENT_ENCODING, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GADatasetPartitioningOptions, + gadataset_partitioning_options, + G_TYPE_OBJECT) + +#define GADATASET_PARTITIONING_OPTIONS_GET_PRIVATE(obj) \ + static_cast( \ + gadataset_partitioning_options_get_instance_private( \ + GADATASET_PARTITIONING_OPTIONS(obj))) + +static void +gadataset_partitioning_options_dispose(GObject *object) +{ + auto priv = GADATASET_PARTITIONING_OPTIONS_GET_PRIVATE(object); + + if (priv->schema) { + g_object_unref(priv->schema); + priv->schema = nullptr; + } + + G_OBJECT_CLASS(gadataset_partitioning_options_parent_class)->dispose(object); +} + +static void +gadataset_partitioning_options_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GADATASET_PARTITIONING_OPTIONS_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_INFER_DICTIONARY: + priv->infer_dictionary = g_value_get_boolean(value); + break; + case PROP_SCHEMA: + { + auto schema = g_value_get_object(value); + if (priv->schema == schema) { + break; + } + auto old_schema = priv->schema; + if (schema) { + g_object_ref(schema); + priv->schema = GARROW_SCHEMA(schema); + } else { + priv->schema = NULL; + } + if (old_schema) { + g_object_unref(old_schema); + } + } + break; + case PROP_SEGMENT_ENCODING: + priv->segment_encoding = + static_cast(g_value_get_enum(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gadataset_partitioning_options_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + auto priv = GADATASET_PARTITIONING_OPTIONS_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_INFER_DICTIONARY: + g_value_set_boolean(value, priv->infer_dictionary); + break; + case PROP_SCHEMA: + g_value_set_object(value, priv->schema); + break; + case PROP_SEGMENT_ENCODING: + g_value_set_enum(value, priv->segment_encoding); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gadataset_partitioning_options_init(GADatasetPartitioningOptions *object) +{ +} + +static void +gadataset_partitioning_options_class_init( + GADatasetPartitioningOptionsClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->dispose = gadataset_partitioning_options_dispose; + gobject_class->set_property = gadataset_partitioning_options_set_property; + gobject_class->get_property = gadataset_partitioning_options_get_property; + + arrow::dataset::PartitioningFactoryOptions default_options; + GParamSpec *spec; + /** + * GADatasetPartitioningOptions:infer-dictionary: + * + * When inferring a schema for partition fields, yield dictionary + * encoded types instead of plain. This can be more efficient when + * materializing virtual columns, and Expressions parsed by the + * finished Partitioning will include dictionaries of all unique + * inspected values for each field. + * + * Since: 6.0.0 + */ + spec = g_param_spec_boolean("infer-dictionary", + "Infer dictionary", + "Whether encode partitioned field values as " + "dictionary", + default_options.infer_dictionary, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_INFER_DICTIONARY, spec); + + /** + * GADatasetPartitioningOptions:schema: + * + * Optionally, an expected schema can be provided, in which case + * inference will only check discovered fields against the schema + * and update internal state (such as dictionaries). + * + * Since: 6.0.0 + */ + spec = g_param_spec_object("schema", + "Schema", + "Inference will only check discovered fields " + "against the schema and update internal state", + GARROW_TYPE_SCHEMA, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_SCHEMA, spec); + + /** + * GADatasetPartitioningOptions:segment-encoding: + * + * After splitting a path into components, decode the path + * components before parsing according to this scheme. + * + * Since: 6.0.0 + */ + spec = g_param_spec_enum("segment-encoding", + "Segment encoding", + "After splitting a path into components, " + "decode the path components before " + "parsing according to this scheme", + GADATASET_TYPE_SEGMENT_ENCODING, + static_cast( + default_options.segment_encoding), + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_SEGMENT_ENCODING, spec); +} + +/** + * gadataset_partitioning_options_new: + * + * Returns: The newly created #GADatasetPartitioningOptions. + * + * Since: 6.0.0 + */ +GADatasetPartitioningOptions * +gadataset_partitioning_options_new(void) +{ + return GADATASET_PARTITIONING_OPTIONS( + g_object_new(GADATASET_TYPE_PARTITIONING_OPTIONS, + NULL)); +} + + +typedef struct GADatasetPartitioningPrivate_ { + std::shared_ptr partitioning; +} GADatasetPartitioningPrivate; + +enum { + PROP_PARTITIONING = 1, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GADatasetPartitioning, + gadataset_partitioning, + G_TYPE_OBJECT) + +#define GADATASET_PARTITIONING_GET_PRIVATE(obj) \ + static_cast( \ + gadataset_partitioning_get_instance_private( \ + GADATASET_PARTITIONING(obj))) + +static void +gadataset_partitioning_finalize(GObject *object) +{ + auto priv = GADATASET_PARTITIONING_GET_PRIVATE(object); + priv->partitioning.~shared_ptr(); + G_OBJECT_CLASS(gadataset_partitioning_parent_class)->finalize(object); +} + +static void +gadataset_partitioning_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GADATASET_PARTITIONING_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_PARTITIONING: + priv->partitioning = + *static_cast *>( + g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gadataset_partitioning_init(GADatasetPartitioning *object) +{ + auto priv = GADATASET_PARTITIONING_GET_PRIVATE(object); + new(&priv->partitioning) std::shared_ptr; +} + +static void +gadataset_partitioning_class_init(GADatasetPartitioningClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->finalize = gadataset_partitioning_finalize; + gobject_class->set_property = gadataset_partitioning_set_property; + + GParamSpec *spec; + spec = g_param_spec_pointer("partitioning", + "Partitioning", + "The raw " + "std::shared *", + static_cast(G_PARAM_WRITABLE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_PARTITIONING, spec); +} + +/** + * gadataset_partitioning_new: + * + * Returns: The newly created #GADatasetPartitioning that doesn't + * partition. + * + * Since: 6.0.0 + */ +GADatasetPartitioning * +gadataset_partitioning_new(void) +{ + auto arrow_partitioning = arrow::dataset::Partitioning::Default(); + return GADATASET_PARTITIONING( + g_object_new(GADATASET_TYPE_PARTITIONING, + "partitioning", &arrow_partitioning, + NULL)); +} + +/** + * gadataset_partitioning_get_type_name: + * @partitioning: A #GADatasetPartitioning. + * + * Returns: The type name of @partitioning. + * + * It should be freed with g_free() when no longer needed. + * + * Since: 6.0.0 + */ +gchar * +gadataset_partitioning_get_type_name(GADatasetPartitioning *partitioning) +{ + auto arrow_partitioning = gadataset_partitioning_get_raw(partitioning); + auto arrow_type_name = arrow_partitioning->type_name(); + return g_strndup(arrow_type_name.c_str(), + arrow_type_name.size()); +} + + +G_DEFINE_TYPE(GADatasetKeyValuePartitioning, + gadataset_key_value_partitioning, + GADATASET_TYPE_PARTITIONING) + +static void +gadataset_key_value_partitioning_init(GADatasetKeyValuePartitioning *object) +{ +} + +static void +gadataset_key_value_partitioning_class_init( + GADatasetKeyValuePartitioningClass *klass) +{ +} + + +G_DEFINE_TYPE(GADatasetDirectoryPartitioning, + gadataset_directory_partitioning, + GADATASET_TYPE_KEY_VALUE_PARTITIONING) + +static void +gadataset_directory_partitioning_init(GADatasetDirectoryPartitioning *object) +{ +} + +static void +gadataset_directory_partitioning_class_init( + GADatasetDirectoryPartitioningClass *klass) +{ +} + +/** + * gadataset_directory_partitioning_new: + * @schema: A #GArrowSchema that describes all partitioned segments. + * @dictionaries: (nullable) (element-type GArrowArray): A list of #GArrowArray + * for dictionary data types in @schema. + * @options: (nullable): A #GADatasetPartitioningOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: The newly created #GADatasetDirectoryPartitioning on success, + * %NULL on error. + * + * Since: 6.0.0 + */ +GADatasetDirectoryPartitioning * +gadataset_directory_partitioning_new(GArrowSchema *schema, + GList *dictionaries, + GADatasetPartitioningOptions *options, + GError **error) +{ + auto arrow_schema = garrow_schema_get_raw(schema); + std::vector> arrow_dictionaries; + for (auto node = dictionaries; node; node = node->next) { + auto dictionary = GARROW_ARRAY(node->data); + if (dictionary) { + arrow_dictionaries.push_back(garrow_array_get_raw(dictionary)); + } else { + arrow_dictionaries.push_back(nullptr); + } + } + arrow::dataset::KeyValuePartitioningOptions arrow_options; + if (options) { + arrow_options = + gadataset_partitioning_options_get_raw_key_value_partitioning_options( + options); + } + auto arrow_partitioning = + std::make_shared( + arrow_schema, + arrow_dictionaries, + arrow_options); + return GADATASET_DIRECTORY_PARTITIONING( + g_object_new(GADATASET_TYPE_DIRECTORY_PARTITIONING, + "partitioning", &arrow_partitioning, + NULL)); +} + + +G_END_DECLS + +arrow::dataset::KeyValuePartitioningOptions +gadataset_partitioning_options_get_raw_key_value_partitioning_options( + GADatasetPartitioningOptions *options) +{ + auto priv = GADATASET_PARTITIONING_OPTIONS_GET_PRIVATE(options); + arrow::dataset::KeyValuePartitioningOptions arrow_options; + arrow_options.segment_encoding = + static_cast(priv->segment_encoding); + return arrow_options; +} + +std::shared_ptr +gadataset_partitioning_get_raw(GADatasetPartitioning *partitioning) +{ + auto priv = GADATASET_PARTITIONING_GET_PRIVATE(partitioning); + return priv->partitioning; +} diff --git a/c_glib/arrow-dataset-glib/partitioning.h b/c_glib/arrow-dataset-glib/partitioning.h new file mode 100644 index 00000000000..d408d9bd502 --- /dev/null +++ b/c_glib/arrow-dataset-glib/partitioning.h @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +G_BEGIN_DECLS + +/** + * GADatasetSegmentEncoding + * @GADATASET_SEGMENT_ENCODING_NONE: No encoding. + * @GADATASET_SEGMENT_ENCODING_URI: Segment values are URL-encoded. + * + * They are corresponding to `arrow::dataset::SegmentEncoding` values. + * + * Since: 6.0.0 + */ +typedef enum { + GADATASET_SEGMENT_ENCODING_NONE, + GADATASET_SEGMENT_ENCODING_URI, +} GADatasetSegmentEncoding; + + +#define GADATASET_TYPE_PARTITIONING_OPTIONS \ + (gadataset_partitioning_options_get_type()) +G_DECLARE_DERIVABLE_TYPE(GADatasetPartitioningOptions, + gadataset_partitioning_options, + GADATASET, + PARTITIONING_OPTIONS, + GObject) +struct _GADatasetPartitioningOptionsClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_6_0 +GADatasetPartitioningOptions * +gadataset_partitioning_options_new(void); + + +#define GADATASET_TYPE_PARTITIONING (gadataset_partitioning_get_type()) +G_DECLARE_DERIVABLE_TYPE(GADatasetPartitioning, + gadataset_partitioning, + GADATASET, + PARTITIONING, + GObject) +struct _GADatasetPartitioningClass +{ + GObjectClass parent_class; +}; + +GARROW_AVAILABLE_IN_6_0 +GADatasetPartitioning * +gadataset_partitioning_new(void); +GARROW_AVAILABLE_IN_6_0 +gchar * +gadataset_partitioning_get_type_name(GADatasetPartitioning *partitioning); + + +#define GADATASET_TYPE_KEY_VALUE_PARTITIONING \ + (gadataset_key_value_partitioning_get_type()) +G_DECLARE_DERIVABLE_TYPE(GADatasetKeyValuePartitioning, + gadataset_key_value_partitioning, + GADATASET, + KEY_VALUE_PARTITIONING, + GADatasetPartitioning) +struct _GADatasetKeyValuePartitioningClass +{ + GADatasetPartitioningClass parent_class; +}; + + +#define GADATASET_TYPE_DIRECTORY_PARTITIONING \ + (gadataset_directory_partitioning_get_type()) +G_DECLARE_DERIVABLE_TYPE(GADatasetDirectoryPartitioning, + gadataset_directory_partitioning, + GADATASET, + DIRECTORY_PARTITIONING, + GADatasetKeyValuePartitioning) +struct _GADatasetDirectoryPartitioningClass +{ + GADatasetKeyValuePartitioningClass parent_class; +}; + +GARROW_AVAILABLE_IN_6_0 +GADatasetDirectoryPartitioning * +gadataset_directory_partitioning_new(GArrowSchema *schema, + GList *dictionaries, + GADatasetPartitioningOptions *options, + GError **error); + + +G_END_DECLS diff --git a/c_glib/arrow-dataset-glib/partitioning.hpp b/c_glib/arrow-dataset-glib/partitioning.hpp new file mode 100644 index 00000000000..2481ecb3340 --- /dev/null +++ b/c_glib/arrow-dataset-glib/partitioning.hpp @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include + +arrow::dataset::KeyValuePartitioningOptions +gadataset_partitioning_options_get_raw_key_value_partitioning_options( + GADatasetPartitioningOptions *options); + +std::shared_ptr +gadataset_partitioning_get_raw(GADatasetPartitioning *partitioning); diff --git a/c_glib/arrow-dataset-glib/scanner.cpp b/c_glib/arrow-dataset-glib/scanner.cpp index efa2a5c3287..ddd3fd88af7 100644 --- a/c_glib/arrow-dataset-glib/scanner.cpp +++ b/c_glib/arrow-dataset-glib/scanner.cpp @@ -18,6 +18,7 @@ */ #include +#include #include #include @@ -225,6 +226,24 @@ gadataset_scanner_builder_new(GADatasetDataset *dataset, GError **error) } } +/** + * gadataset_scanner_builder_new_record_batch_reader: + * @reader: A #GArrowRecordBatchReader that produces record batches. + * + * Returns: (nullable): A newly created #GADatasetScannerBuilder. + * + * Since: 6.0.0 + */ +GADatasetScannerBuilder * +gadataset_scanner_builder_new_record_batch_reader( + GArrowRecordBatchReader *reader) +{ + auto arrow_reader = garrow_record_batch_reader_get_raw(reader); + auto arrow_scanner_builder = + arrow::dataset::ScannerBuilder::FromRecordBatchReader(arrow_reader); + return gadataset_scanner_builder_new_raw(&arrow_scanner_builder); +} + /** * gadataset_scanner_builder_finish: * @builder: A #GADatasetScannerBuilder. diff --git a/c_glib/arrow-dataset-glib/scanner.h b/c_glib/arrow-dataset-glib/scanner.h index 446815d6db1..ba7f9c6b7c3 100644 --- a/c_glib/arrow-dataset-glib/scanner.h +++ b/c_glib/arrow-dataset-glib/scanner.h @@ -55,6 +55,10 @@ GARROW_AVAILABLE_IN_5_0 GADatasetScannerBuilder * gadataset_scanner_builder_new(GADatasetDataset *dataset, GError **error); +GARROW_AVAILABLE_IN_6_0 +GADatasetScannerBuilder * +gadataset_scanner_builder_new_record_batch_reader( + GArrowRecordBatchReader *reader); GARROW_AVAILABLE_IN_5_0 GADatasetScanner * gadataset_scanner_builder_finish(GADatasetScannerBuilder *builder, diff --git a/c_glib/arrow-glib/compute.cpp b/c_glib/arrow-glib/compute.cpp index 8783510728a..825d296dd26 100644 --- a/c_glib/arrow-glib/compute.cpp +++ b/c_glib/arrow-glib/compute.cpp @@ -1271,7 +1271,7 @@ garrow_execute_plan_build_source_node(GArrowExecutePlan *plan, /** * garrow_execute_plan_build_aggregate_node: * @plan: A #GArrowExecutePlan. - * @input: A #GarrowExecuteNode. + * @input: A #GArrowExecuteNode. * @options: A #GArrowAggregateNodeOptions. * @error: (nullable): Return location for a #GError or %NULL. * @@ -1304,7 +1304,7 @@ garrow_execute_plan_build_aggregate_node(GArrowExecutePlan *plan, /** * garrow_execute_plan_build_sink_node: * @plan: A #GArrowExecutePlan. - * @input: A #GarrowExecuteNode. + * @input: A #GArrowExecuteNode. * @options: A #GArrowSinkNodeOptions. * @error: (nullable): Return location for a #GError or %NULL. * diff --git a/c_glib/doc/arrow-dataset-glib/arrow-dataset-glib-docs.xml b/c_glib/doc/arrow-dataset-glib/arrow-dataset-glib-docs.xml index 3e8da5bd9d1..b13195b0703 100644 --- a/c_glib/doc/arrow-dataset-glib/arrow-dataset-glib-docs.xml +++ b/c_glib/doc/arrow-dataset-glib/arrow-dataset-glib-docs.xml @@ -39,6 +39,8 @@ Data + Partitioning + Dataset Dataset factory @@ -66,8 +68,12 @@ Index of deprecated API + + Index of new symbols in 6.0.0 + + - Index of new symbols in 4.0.0 + Index of new symbols in 5.0.0 diff --git a/c_glib/test/dataset/test-file-system-dataset.rb b/c_glib/test/dataset/test-file-system-dataset.rb index 6d6ec3b18c6..0e856b678f8 100644 --- a/c_glib/test/dataset/test-file-system-dataset.rb +++ b/c_glib/test/dataset/test-file-system-dataset.rb @@ -16,19 +16,73 @@ # under the License. class TestDatasetFileSystemDataset < Test::Unit::TestCase + include Helper::Buildable + include Helper::Readable + def setup omit("Arrow Dataset is required") unless defined?(ArrowDataset) Dir.mktmpdir do |tmpdir| @dir = tmpdir - format = ArrowDataset::IPCFileFormat.new - factory = ArrowDataset::FileSystemDatasetFactory.new(format) - factory.file_system = Arrow::LocalFileSystem.new - @dataset = factory.finish + @format = ArrowDataset::IPCFileFormat.new + @factory = ArrowDataset::FileSystemDatasetFactory.new(@format) + @file_system = Arrow::LocalFileSystem.new + @factory.file_system = @file_system + partitioning_schema = build_schema(label: Arrow::StringDataType.new) + @partitioning = + ArrowDataset::DirectoryPartitioning.new(partitioning_schema) + @factory.partitioning = @partitioning yield end end def test_type_name - assert_equal("filesystem", @dataset.type_name) + dataset = @factory.finish + assert_equal("filesystem", dataset.type_name) + end + + def test_format + dataset = @factory.finish + assert_equal(@format, dataset.format) + end + + def test_file_system + dataset = @factory.finish + assert_equal(@file_system, dataset.file_system) + end + + def test_partitioning + dataset = @factory.finish + assert_equal(@partitioning, dataset.partitioning) + end + + def test_read_write + table = build_table(label: build_string_array(["a", "a", "b", "c"]), + count: build_int32_array([1, 10, 2, 3])) + table_reader = Arrow::TableBatchReader.new(table) + scanner_builder = ArrowDataset::ScannerBuilder.new(table_reader) + scanner = scanner_builder.finish + options = ArrowDataset::FileSystemDatasetWriteOptions.new + options.file_write_options = @format.default_write_options + options.file_system = Arrow::LocalFileSystem.new + options.base_dir = @dir + options.base_name_template = "{i}.arrow" + options.partitioning = @partitioning + ArrowDataset::FileSystemDataset.write_scanner(scanner, options) + Find.find(@dir) do |path| + @factory.add_path(path) if File.file?(path) + end + @factory.partition_base_dir = @dir + dataset = @factory.finish + assert_equal(build_table(count: [ + build_int32_array([1, 10]), + build_int32_array([2]), + build_int32_array([3]), + ], + label: [ + build_string_array(["a", "a"]), + build_string_array(["b"]), + build_string_array(["c"]), + ]), + dataset.to_table) end end diff --git a/c_glib/test/dataset/test-file-writer.rb b/c_glib/test/dataset/test-file-writer.rb new file mode 100644 index 00000000000..5b25d6044d6 --- /dev/null +++ b/c_glib/test/dataset/test-file-writer.rb @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestDatasetFileWriter < Test::Unit::TestCase + include Helper::Buildable + include Helper::Readable + + def setup + omit("Arrow Dataset is required") unless defined?(ArrowDataset) + Dir.mktmpdir do |tmpdir| + @dir = tmpdir + @format = ArrowDataset::IPCFileFormat.new + @file_system = Arrow::LocalFileSystem.new + @path = File.join(@dir, "data.arrow") + @output = @file_system.open_output_stream(@path) + @schema = build_schema(visible: Arrow::BooleanDataType.new, + point: Arrow::UInt8DataType.new) + @writer = @format.open_writer(@output, + @file_system, + @path, + @schema, + @format.default_write_options) + yield + end + end + + def test_write_record_batch + record_batch = build_record_batch( + visible: build_boolean_array([true, false, true]), + point: build_uint8_array([1, 2, 3])) + @writer.write_record_batch(record_batch) + @writer.finish + @output.close + read_table(@path) do |written_table| + assert_equal(Arrow::Table.new(record_batch.schema, + [record_batch]), + written_table) + end + end + + def test_write_record_batch_reader + table = build_table(visible: build_boolean_array([true, false, true]), + point: build_uint8_array([1, 2, 3])) + @writer.write_record_batch_reader(Arrow::TableBatchReader.new(table)) + @writer.finish + @output.close + read_table(@path) do |written_table| + assert_equal(table, written_table) + end + end +end diff --git a/c_glib/test/dataset/test-partitioning-options.rb b/c_glib/test/dataset/test-partitioning-options.rb new file mode 100644 index 00000000000..9ff585aa7cf --- /dev/null +++ b/c_glib/test/dataset/test-partitioning-options.rb @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestDatasetPartitioningOptions < Test::Unit::TestCase + include Helper::Buildable + + def setup + omit("Arrow Dataset is required") unless defined?(ArrowDataset) + @options = ArrowDataset::PartitioningOptions.new + end + + def test_infer_dictionary + assert_false(@options.infer_dictionary?) + @options.infer_dictionary = true + assert_true(@options.infer_dictionary?) + end + + def test_schema + assert_nil(@options.schema) + schema = build_schema(year: Arrow::UInt16DataType.new) + @options.schema = schema + assert_equal(schema, @options.schema) + end + + def test_segment_encoding + assert_equal(ArrowDataset::SegmentEncoding::NONE, + @options.segment_encoding) + @options.segment_encoding = :uri + assert_equal(ArrowDataset::SegmentEncoding::URI, + @options.segment_encoding) + end +end diff --git a/c_glib/test/dataset/test-partitioning.rb b/c_glib/test/dataset/test-partitioning.rb new file mode 100644 index 00000000000..d98e51f3c59 --- /dev/null +++ b/c_glib/test/dataset/test-partitioning.rb @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestDatasetPartitioning < Test::Unit::TestCase + include Helper::Buildable + + def setup + omit("Arrow Dataset is required") unless defined?(ArrowDataset) + end + + def test_default + assert_equal("default", ArrowDataset::Partitioning.new.type_name) + end + + def test_directory + schema = build_schema(year: Arrow::UInt16DataType.new) + partitioning = ArrowDataset::DirectoryPartitioning.new(schema) + assert_equal("schema", partitioning.type_name) + end +end diff --git a/c_glib/test/dataset/test-scanner.rb b/c_glib/test/dataset/test-scanner.rb index f7702d4905f..ed6a706c6f2 100644 --- a/c_glib/test/dataset/test-scanner.rb +++ b/c_glib/test/dataset/test-scanner.rb @@ -45,4 +45,11 @@ def setup def test_to_table assert_equal(@table, @scanner.to_table) end + + def test_new_record_batch_reader + reader = Arrow::TableBatchReader.new(@table) + builder = ArrowDataset::ScannerBuilder.new(reader) + scanner = builder.finish + assert_equal(@table, scanner.to_table) + end end diff --git a/c_glib/test/helper/buildable.rb b/c_glib/test/helper/buildable.rb index 356fa651c6a..3a1240cfa1f 100644 --- a/c_glib/test/helper/buildable.rb +++ b/c_glib/test/helper/buildable.rb @@ -17,6 +17,13 @@ module Helper module Buildable + def build_schema(fields) + fields = fields.collect do |name, data_type| + Arrow::Field.new(name, data_type) + end + Arrow::Schema.new(fields) + end + def build_null_array(values) build_array(Arrow::NullArrayBuilder.new, values) end diff --git a/c_glib/test/helper/readable.rb b/c_glib/test/helper/readable.rb new file mode 100644 index 00000000000..b4ec38a6554 --- /dev/null +++ b/c_glib/test/helper/readable.rb @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module Helper + module Readable + def read_table(input, type: :file) + if input.is_a?(Arrow::Buffer) + input_stream = Arrow::BufferIntputStream.new(input) + else + input_stream = Arrow::MemoryMappedInputStream.new(input) + end + begin + if type == :file + reader = Arrow::RecordBatchFileReader.new(input_stream) + record_batches = [] + reader.n_record_batches.times do |i| + record_batches << reader.read_record_batch(i) + end + yield(Arrow::Table.new(record_batches[0].schema, record_batches)) + else + reader = Arrow::RecordBatchStreamReader.new(input_stream) + begin + yield(reader.read_all) + ensure + reader.close + end + end + ensure + input_stream.close + end + end + end +end diff --git a/c_glib/test/run-test.rb b/c_glib/test/run-test.rb index abae4e722c5..621c78c3986 100755 --- a/c_glib/test/run-test.rb +++ b/c_glib/test/run-test.rb @@ -84,6 +84,7 @@ def should_unlock_gvl?(info, klass) end require "fileutils" +require "find" require "rbconfig" require "stringio" require "tempfile" @@ -97,6 +98,7 @@ def should_unlock_gvl?(info, klass) end require_relative "helper/omittable" require_relative "helper/plasma-store" +require_relative "helper/readable" require_relative "helper/writable" exit(Test::Unit::AutoRunner.run(true, test_dir.to_s)) diff --git a/ruby/red-arrow-dataset/lib/arrow-dataset/arrow-table-savable.rb b/ruby/red-arrow-dataset/lib/arrow-dataset/arrow-table-savable.rb new file mode 100644 index 00000000000..218860cbc98 --- /dev/null +++ b/ruby/red-arrow-dataset/lib/arrow-dataset/arrow-table-savable.rb @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module ArrowDataset + module ArrowTableSavable + private + def save_to_uri + format = FileFormat.resolve(@options[:format]) + options = FileSystemDatasetWriteOptions.new + options.file_write_options = format.default_write_options + if @output.scheme.nil? + options.file_system = Arrow::LocalFileSystem.new + else + options.file_system = Arrow::FileSystem.create(@output.to_s) + end + partitioning = @options[:partitioning] + if partitioning + # TODO + options.base_dir = File.dirname(@output.path) + options.base_name_template = File.basename(@output.path) + options.partitioning = Partitioning.resolve(@options[:partitioning]) + scanner_builder = ScannerBuilder.new(@table) + scanner = scanner_builder.finish + FileSystemDataset.write_scanner(scanner, options) + else + dir = File.dirname(@output.path) + unless File.exist?(dir) + options.file_system.create_dir(dir, true) + end + options.file_system.open_output_stream(@output.path) do |output_stream| + format.open_writer(output_stream, + options.file_system, + @output.path, + @table.schema, + format.default_write_options) do |writer| + reader = Arrow::TableBatchReader.new(@table) + writer.write_record_batch_reader(reader) + end + end + end + end + end +end + +module Arrow + class TableSaver + include ArrowDataset::ArrowTableSavable + end +end diff --git a/ruby/red-arrow-dataset/lib/arrow-dataset/file-format.rb b/ruby/red-arrow-dataset/lib/arrow-dataset/file-format.rb index 266ea49e3f5..83e61c4b24a 100644 --- a/ruby/red-arrow-dataset/lib/arrow-dataset/file-format.rb +++ b/ruby/red-arrow-dataset/lib/arrow-dataset/file-format.rb @@ -41,5 +41,19 @@ def resolve(format) end end end + + alias_method :open_writer_raw, :open_writer + def open_writer(destination, file_system, path, schema, options) + writer = open_writer_raw(destination, file_system, path, schema, options) + if block_given? + begin + yield(writer) + ensure + writer.finish + end + else + writer + end + end end end diff --git a/ruby/red-arrow-dataset/lib/arrow-dataset/loader.rb b/ruby/red-arrow-dataset/lib/arrow-dataset/loader.rb index 599c055e84c..b1be000f7c3 100644 --- a/ruby/red-arrow-dataset/lib/arrow-dataset/loader.rb +++ b/ruby/red-arrow-dataset/lib/arrow-dataset/loader.rb @@ -30,6 +30,7 @@ def post_load(repository, namespace) def require_libraries require "arrow-dataset/arrow-table-loadable" + require "arrow-dataset/arrow-table-savable" require "arrow-dataset/dataset" require "arrow-dataset/file-format" require "arrow-dataset/file-system-dataset-factory" diff --git a/ruby/red-arrow-dataset/test/test-arrow-table.rb b/ruby/red-arrow-dataset/test/test-arrow-table.rb index 6ae9a905d2b..a9ab40337b6 100644 --- a/ruby/red-arrow-dataset/test/test-arrow-table.rb +++ b/ruby/red-arrow-dataset/test/test-arrow-table.rb @@ -19,10 +19,9 @@ class TestArrowTable < Test::Unit::TestCase def setup Dir.mktmpdir do |tmpdir| @dir = tmpdir - @path = File.join(@dir, "table.arrow") + @path = File.join(@dir, "data", "table.arrow") @table = Arrow::Table.new(visible: [true, false, true], point: [1, 2, 3]) - @table.save(@path) yield end end @@ -40,12 +39,14 @@ def build_file_uri(path) def test_no_scheme Dir.chdir(@dir) do uri = URI(File.basename(@path)) + @table.save(uri) assert_equal(@table, Arrow::Table.load(uri)) end end def test_file uri = build_file_uri(@path) + @table.save(uri) assert_equal(@table, Arrow::Table.load(uri)) end end diff --git a/ruby/red-arrow/lib/arrow/file-system.rb b/ruby/red-arrow/lib/arrow/file-system.rb new file mode 100644 index 00000000000..7d105b42a3e --- /dev/null +++ b/ruby/red-arrow/lib/arrow/file-system.rb @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module Arrow + class FileSystem + alias_method :open_output_stream_raw, :open_output_stream + def open_output_stream(path) + stream = open_output_stream_raw(path) + if block_given? + begin + yield(stream) + ensure + stream.close + end + else + stream + end + end + end +end diff --git a/ruby/red-arrow/lib/arrow/loader.rb b/ruby/red-arrow/lib/arrow/loader.rb index 5ede8eeea1d..9ca7e2619ef 100644 --- a/ruby/red-arrow/lib/arrow/loader.rb +++ b/ruby/red-arrow/lib/arrow/loader.rb @@ -72,6 +72,7 @@ def require_libraries require "arrow/equal-options" require "arrow/field" require "arrow/file-output-stream" + require "arrow/file-system" require "arrow/fixed-size-binary-array" require "arrow/fixed-size-binary-array-builder" require "arrow/group" diff --git a/ruby/red-arrow/lib/arrow/table-saver.rb b/ruby/red-arrow/lib/arrow/table-saver.rb index bc2296a0a07..207a10a8217 100644 --- a/ruby/red-arrow/lib/arrow/table-saver.rb +++ b/ruby/red-arrow/lib/arrow/table-saver.rb @@ -32,6 +32,29 @@ def initialize(table, output, options={}) end def save + if @output.is_a?(URI) + custom_save_method = "save_to_uri" + else + custom_save_method = "save_to_file" + end + unless respond_to?(custom_save_method, true) + available_schemes = [] + (methods(true) | private_methods(true)).each do |name| + match_data = /\Asave_to_/.match(name.to_s) + if match_data + available_schemes << match_data.post_match + end + end + message = "Arrow::Table save source must be one of [" + message << available_schemes.join(", ") + message << "]: #{@output.scheme.inspect}" + raise ArgumentError, message + end + __send__(custom_save_method) + end + + private + def save_to_file format = @options[:format] custom_save_method = "save_as_#{format}" unless respond_to?(custom_save_method, true) @@ -57,21 +80,24 @@ def save end end - private def fill_options if @options[:format] and @options.key?(:compression) return end - if @output.is_a?(Buffer) + case @output + when Buffer info = {} + when URI + extension = PathExtension.new(@output.path) + info = extension.extract else extension = PathExtension.new(@output) info = extension.extract end format = info[:format] @options = @options.dup - if format and respond_to?("save_as_#{format}", true) + if format @options[:format] ||= format.to_sym else @options[:format] ||= :arrow From c0f02a8b198d9051e9ebbe902fb2f53c23f33aff Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Thu, 2 Sep 2021 06:03:12 +0900 Subject: [PATCH 2/4] Use Arrow::FileInputStream for easy to complete close --- c_glib/arrow-glib/input-stream.cpp | 124 +++++++++++++++++++++++++---- c_glib/arrow-glib/input-stream.h | 68 +++++++--------- c_glib/arrow-glib/input-stream.hpp | 11 ++- c_glib/test/helper/readable.rb | 2 +- 4 files changed, 146 insertions(+), 59 deletions(-) diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp index 64f366a6282..57a13e65a1f 100644 --- a/c_glib/arrow-glib/input-stream.cpp +++ b/c_glib/arrow-glib/input-stream.cpp @@ -50,6 +50,8 @@ G_BEGIN_DECLS * * #GArrowBufferInputStream is a class to read data on buffer. * + * #GArrowFileInputStream is a class to read data in file. + * * #GArrowMemoryMappedInputStream is a class to read data in file by * mapping the file on memory. It supports zero copy. * @@ -631,6 +633,86 @@ garrow_buffer_input_stream_get_buffer(GArrowBufferInputStream *input_stream) } +G_DEFINE_TYPE(GArrowFileInputStream, + garrow_file_input_stream, + GARROW_TYPE_SEEKABLE_INPUT_STREAM); + +static void +garrow_file_input_stream_init(GArrowFileInputStream *object) +{ +} + +static void +garrow_file_input_stream_class_init(GArrowFileInputStreamClass *klass) +{ +} + +/** + * garrow_file_input_stream_new: + * @path: The path of the file to be opened. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (nullable): A newly created #GArrowFileInputStream + * or %NULL on error. + * + * Since: 6.0.0 + */ +GArrowFileInputStream * +garrow_file_input_stream_new(const gchar *path, + GError **error) +{ + auto arrow_stream_result = arrow::io::ReadableFile::Open(path); + if (garrow::check(error, arrow_stream_result, "[file-input-stream][new]")) { + auto arrow_stream = *arrow_stream_result; + return garrow_file_input_stream_new_raw(&arrow_stream); + } else { + return NULL; + } +} + +/** + * garrow_file_input_stream_new_file_descriptor: + * @file_descriptor: The file descriptor of this input stream. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (nullable): A newly created #GArrowFileInputStream + * or %NULL on error. + * + * Since: 6.0.0 + */ +GArrowFileInputStream * +garrow_file_input_stream_new_file_descriptor(gint fd, + GError **error) +{ + auto arrow_stream_result = arrow::io::ReadableFile::Open(fd); + if (garrow::check(error, + arrow_stream_result, + "[file-input-stream][new-file-descriptor]")) { + auto arrow_stream = *arrow_stream_result; + return garrow_file_input_stream_new_raw(&arrow_stream); + } else { + return NULL; + } +} + +/** + * garrow_file_input_stream_get_file_descriptor: + * @stream: A #GArrowFileInuptStream. + * + * Returns: The file descriptor of @stream. + * + * Since: 6.0.0 + */ +gint +garrow_file_input_stream_get_file_descriptor(GArrowFileInputStream *stream) +{ + auto arrow_stream = + std::static_pointer_cast( + garrow_input_stream_get_raw(GARROW_INPUT_STREAM(stream))); + return arrow_stream->file_descriptor(); +} + + G_DEFINE_TYPE(GArrowMemoryMappedInputStream, garrow_memory_mapped_input_stream, GARROW_TYPE_SEEKABLE_INPUT_STREAM); @@ -657,18 +739,14 @@ GArrowMemoryMappedInputStream * garrow_memory_mapped_input_stream_new(const gchar *path, GError **error) { - auto arrow_memory_mapped_file_result = - arrow::io::MemoryMappedFile::Open(std::string(path), - arrow::io::FileMode::READ); - if (arrow_memory_mapped_file_result.ok()) { - auto arrow_memory_mapped_file = - arrow_memory_mapped_file_result.ValueOrDie(); - return garrow_memory_mapped_input_stream_new_raw(&(arrow_memory_mapped_file)); + auto arrow_stream_result = + arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ); + if (garrow::check(error, + arrow_stream_result, + "[memory-mapped-input-stream][new]")) { + auto arrow_stream = *arrow_stream_result; + return garrow_memory_mapped_input_stream_new_raw(&arrow_stream); } else { - std::string context("[memory-mapped-input-stream][open]: <"); - context += path; - context += ">"; - garrow::check(error, arrow_memory_mapped_file_result, context.c_str()); return NULL; } } @@ -1203,16 +1281,28 @@ garrow_buffer_input_stream_get_raw(GArrowBufferInputStream *buffer_input_stream) return arrow_buffer_reader; } + +GArrowFileInputStream * +garrow_file_input_stream_new_raw( + std::shared_ptr *arrow_stream) +{ + return GARROW_FILE_INPUT_STREAM(g_object_new(GARROW_TYPE_FILE_INPUT_STREAM, + "input-stream", arrow_stream, + NULL)); +} + + GArrowMemoryMappedInputStream * -garrow_memory_mapped_input_stream_new_raw(std::shared_ptr *arrow_memory_mapped_file) +garrow_memory_mapped_input_stream_new_raw( + std::shared_ptr *arrow_stream) { - auto object = g_object_new(GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM, - "input-stream", arrow_memory_mapped_file, - NULL); - auto memory_mapped_input_stream = GARROW_MEMORY_MAPPED_INPUT_STREAM(object); - return memory_mapped_input_stream; + return GARROW_MEMORY_MAPPED_INPUT_STREAM( + g_object_new(GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM, + "input-stream", arrow_stream, + NULL)); } + GArrowCompressedInputStream * garrow_compressed_input_stream_new_raw(std::shared_ptr *arrow_raw, GArrowCodec *codec, diff --git a/c_glib/arrow-glib/input-stream.h b/c_glib/arrow-glib/input-stream.h index 4b4c51eb3e7..5ead66b8389 100644 --- a/c_glib/arrow-glib/input-stream.h +++ b/c_glib/arrow-glib/input-stream.h @@ -104,54 +104,42 @@ GArrowBufferInputStream *garrow_buffer_input_stream_new(GArrowBuffer *buffer); GArrowBuffer *garrow_buffer_input_stream_get_buffer(GArrowBufferInputStream *input_stream); -#define GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM \ - (garrow_memory_mapped_input_stream_get_type()) -#define GARROW_MEMORY_MAPPED_INPUT_STREAM(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj), \ - GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM, \ - GArrowMemoryMappedInputStream)) -#define GARROW_MEMORY_MAPPED_INPUT_STREAM_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass), \ - GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM, \ - GArrowMemoryMappedInputStreamClass)) -#define GARROW_IS_MEMORY_MAPPED_INPUT_STREAM(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj), \ - GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM)) -#define GARROW_IS_MEMORY_MAPPED_INPUT_STREAM_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass), \ - GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM)) -#define GARROW_MEMORY_MAPPED_INPUT_STREAM_GET_CLASS(obj) \ - (G_TYPE_INSTANCE_GET_CLASS((obj), \ - GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM, \ - GArrowMemoryMappedInputStreamClass)) - -typedef struct _GArrowMemoryMappedInputStream GArrowMemoryMappedInputStream; -#ifndef __GTK_DOC_IGNORE__ -typedef struct _GArrowMemoryMappedInputStreamClass GArrowMemoryMappedInputStreamClass; -#endif - -/** - * GArrowMemoryMappedInputStream: - * - * It wraps `arrow::io::MemoryMappedFile`. - */ -struct _GArrowMemoryMappedInputStream +#define GARROW_TYPE_FILE_INPUT_STREAM (garrow_file_input_stream_get_type()) +G_DECLARE_DERIVABLE_TYPE(GArrowFileInputStream, + garrow_file_input_stream, + GARROW, + FILE_INPUT_STREAM, + GArrowSeekableInputStream) +struct _GArrowFileInputStreamClass { - /*< private >*/ - GArrowSeekableInputStream parent_instance; + GArrowSeekableInputStreamClass parent_class; }; -#ifndef __GTK_DOC_IGNORE__ +GArrowFileInputStream * +garrow_file_input_stream_new(const gchar *path, + GError **error); +GArrowFileInputStream * +garrow_file_input_stream_new_file_descriptor(gint fd, + GError **error); +gint +garrow_file_input_stream_get_file_descriptor(GArrowFileInputStream *stream); + + +#define GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM \ + (garrow_memory_mapped_input_stream_get_type()) +G_DECLARE_DERIVABLE_TYPE(GArrowMemoryMappedInputStream, + garrow_memory_mapped_input_stream, + GARROW, + MEMORY_MAPPED_INPUT_STREAM, + GArrowSeekableInputStream) struct _GArrowMemoryMappedInputStreamClass { GArrowSeekableInputStreamClass parent_class; }; -#endif -GType garrow_memory_mapped_input_stream_get_type(void) G_GNUC_CONST; - -GArrowMemoryMappedInputStream *garrow_memory_mapped_input_stream_new(const gchar *path, - GError **error); +GArrowMemoryMappedInputStream * +garrow_memory_mapped_input_stream_new(const gchar *path, + GError **error); #define GARROW_TYPE_GIO_INPUT_STREAM \ diff --git a/c_glib/arrow-glib/input-stream.hpp b/c_glib/arrow-glib/input-stream.hpp index 88fbb8f64c1..2a0a3d3ddcc 100644 --- a/c_glib/arrow-glib/input-stream.hpp +++ b/c_glib/arrow-glib/input-stream.hpp @@ -40,7 +40,16 @@ garrow_buffer_input_stream_new_raw(std::shared_ptr *arr GArrowBuffer *buffer); std::shared_ptr garrow_buffer_input_stream_get_raw(GArrowBufferInputStream *input_stream); -GArrowMemoryMappedInputStream *garrow_memory_mapped_input_stream_new_raw(std::shared_ptr *arrow_memory_mapped_file); + +GArrowFileInputStream * +garrow_file_input_stream_new_raw( + std::shared_ptr *arrow_stream); + + +GArrowMemoryMappedInputStream * +garrow_memory_mapped_input_stream_new_raw( + std::shared_ptr *arrow_stream); + GArrowCompressedInputStream * garrow_compressed_input_stream_new_raw(std::shared_ptr *arrow_raw, diff --git a/c_glib/test/helper/readable.rb b/c_glib/test/helper/readable.rb index b4ec38a6554..81bf0795c6b 100644 --- a/c_glib/test/helper/readable.rb +++ b/c_glib/test/helper/readable.rb @@ -21,7 +21,7 @@ def read_table(input, type: :file) if input.is_a?(Arrow::Buffer) input_stream = Arrow::BufferIntputStream.new(input) else - input_stream = Arrow::MemoryMappedInputStream.new(input) + input_stream = Arrow::FileInputStream.new(input) end begin if type == :file From 37bbc0ec197c1b1ef8ea1cb4fd00d8badde59506 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Thu, 2 Sep 2021 08:43:34 +0900 Subject: [PATCH 3/4] Add support for file URI on Windows --- .../lib/arrow-dataset/arrow-table-savable.rb | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/ruby/red-arrow-dataset/lib/arrow-dataset/arrow-table-savable.rb b/ruby/red-arrow-dataset/lib/arrow-dataset/arrow-table-savable.rb index 218860cbc98..f3896f04b92 100644 --- a/ruby/red-arrow-dataset/lib/arrow-dataset/arrow-table-savable.rb +++ b/ruby/red-arrow-dataset/lib/arrow-dataset/arrow-table-savable.rb @@ -22,29 +22,34 @@ def save_to_uri format = FileFormat.resolve(@options[:format]) options = FileSystemDatasetWriteOptions.new options.file_write_options = format.default_write_options + path = @output.path if @output.scheme.nil? options.file_system = Arrow::LocalFileSystem.new else options.file_system = Arrow::FileSystem.create(@output.to_s) + # /C:/... -> C:/... + unless File.expand_path(".").start_with?("/") + path = path.gsub(/\A\//, "") + end end partitioning = @options[:partitioning] if partitioning # TODO - options.base_dir = File.dirname(@output.path) - options.base_name_template = File.basename(@output.path) + options.base_dir = File.dirname(path) + options.base_name_template = File.basename(path) options.partitioning = Partitioning.resolve(@options[:partitioning]) scanner_builder = ScannerBuilder.new(@table) scanner = scanner_builder.finish FileSystemDataset.write_scanner(scanner, options) else - dir = File.dirname(@output.path) + dir = File.dirname(path) unless File.exist?(dir) options.file_system.create_dir(dir, true) end - options.file_system.open_output_stream(@output.path) do |output_stream| + options.file_system.open_output_stream(path) do |output_stream| format.open_writer(output_stream, options.file_system, - @output.path, + path, @table.schema, format.default_write_options) do |writer| reader = Arrow::TableBatchReader.new(@table) From f274ab5c73b5f6d0415e9a1be3eb6737fb287a87 Mon Sep 17 00:00:00 2001 From: Sutou Kouhei Date: Thu, 2 Sep 2021 12:33:47 +0900 Subject: [PATCH 4/4] Add test --- c_glib/test/test-file-input-stream.rb | 102 ++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 c_glib/test/test-file-input-stream.rb diff --git a/c_glib/test/test-file-input-stream.rb b/c_glib/test/test-file-input-stream.rb new file mode 100644 index 00000000000..2b43f97f5dd --- /dev/null +++ b/c_glib/test/test-file-input-stream.rb @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class TestFileInputStream < Test::Unit::TestCase + def setup + @data = "Hello World" + @tempfile = Tempfile.open("arrow-file-input-stream") + @tempfile.write(@data) + @tempfile.close + end + + def test_new + input = Arrow::FileInputStream.new(@tempfile.path) + begin + buffer = input.read(5) + assert_equal("Hello", buffer.data.to_s) + ensure + input.close + end + end + + def test_close + input = Arrow::FileInputStream.new(@tempfile.path) + assert do + not input.closed? + end + input.close + assert do + input.closed? + end + end + + def test_size + input = Arrow::FileInputStream.new(@tempfile.path) + begin + assert_equal(@data.bytesize, input.size) + ensure + input.close + end + end + + def test_read + input = Arrow::FileInputStream.new(@tempfile.path) + begin + buffer = input.read(5) + assert_equal("Hello", buffer.data.to_s) + ensure + input.close + end + end + + def test_read_at + input = Arrow::FileInputStream.new(@tempfile.path) + begin + buffer = input.read_at(6, 5) + assert_equal("World", buffer.data.to_s) + ensure + input.close + end + end + + def test_mode + input = Arrow::FileInputStream.new(@tempfile.path) + begin + assert_equal(Arrow::FileMode::READ, input.mode) + ensure + input.close + end + end + + def test_file_descriptor + @tempfile.open + begin + fd = @tempfile.fileno + input = Arrow::FileInputStream.new(fd) + begin + assert_equal(fd, input.file_descriptor) + ensure + input.close + end + ensure + begin + @tempfile.close + rescue + end + end + end +end