From 769ce9b316ad3c198ba0fd7930d2f666986a730e Mon Sep 17 00:00:00 2001 From: Muhammad Taha Naveed Date: Thu, 15 Jan 2026 18:27:18 +0500 Subject: [PATCH] Replace libcsv with pg COPY for csv loading - Commit also adds permission checks - Resolves a critical memory spike issue on loading large file - Use pg's COPY infrastructure (BeginCopyFrom, NextCopyFromRawFields) for 64KB buffered CSV parsing instead of libcsv - Add byte based flush threshold (64KB) matching COPY behavior for memory safety - Use heap_multi_insert with BulkInsertState for optimized batch inserts - Add per batch memory context to prevent memory growth during large loads - Remove libcsv dependency (libcsv.c, csv.h) - Improves loading performance by 15-20% - No previous regression tests were impacted - Added regression tests for permissions/rls Assisted-by AI --- Makefile | 1 - regress/expected/age_load.out | 189 ++++++++ regress/expected/index.out | 12 +- regress/sql/age_load.sql | 125 ++++++ regress/sql/index.sql | 2 +- src/backend/utils/load/ag_load_edges.c | 388 +++++++++-------- src/backend/utils/load/ag_load_labels.c | 381 ++++++++-------- src/backend/utils/load/age_load.c | 248 ++++++++++- src/backend/utils/load/libcsv.c | 549 ------------------------ src/include/utils/load/ag_load_edges.h | 52 +-- src/include/utils/load/ag_load_labels.h | 50 +-- src/include/utils/load/age_load.h | 27 +- src/include/utils/load/csv.h | 108 ----- 13 files changed, 1000 insertions(+), 1132 deletions(-) delete mode 100644 src/backend/utils/load/libcsv.c delete mode 100644 src/include/utils/load/csv.h diff --git a/Makefile b/Makefile index ffad7d6af..a8faa2bb8 100644 --- a/Makefile +++ b/Makefile @@ -69,7 +69,6 @@ OBJS = src/backend/age.o \ src/backend/utils/load/ag_load_labels.o \ src/backend/utils/load/ag_load_edges.o \ src/backend/utils/load/age_load.o \ - src/backend/utils/load/libcsv.o \ src/backend/utils/name_validation.o \ src/backend/utils/ag_guc.o diff --git a/regress/expected/age_load.out b/regress/expected/age_load.out index 55d1ff1d6..1f76c31ce 100644 --- a/regress/expected/age_load.out +++ b/regress/expected/age_load.out @@ -454,6 +454,195 @@ NOTICE: graph "agload_conversion" has been dropped (1 row) +-- +-- Test security and permissions +-- +SELECT create_graph('agload_security'); +NOTICE: graph "agload_security" has been created + create_graph +-------------- + +(1 row) + +SELECT create_vlabel('agload_security', 'Person1'); +NOTICE: VLabel "Person1" has been created + create_vlabel +--------------- + +(1 row) + +SELECT create_vlabel('agload_security', 'Person2'); +NOTICE: VLabel "Person2" has been created + create_vlabel +--------------- + +(1 row) + +SELECT create_elabel('agload_security', 'SecEdge'); +NOTICE: ELabel "SecEdge" has been created + create_elabel +--------------- + +(1 row) + +-- +-- Test 1: File read permission (pg_read_server_files role) +-- +-- Create a user without pg_read_server_files role +CREATE USER load_test_user; +GRANT USAGE ON SCHEMA ag_catalog TO load_test_user; +-- This should fail because load_test_user doesn't have pg_read_server_files +SET ROLE load_test_user; +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +ERROR: permission denied to LOAD from a file +DETAIL: Only roles with privileges of the "pg_read_server_files" role may LOAD from a file. +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); +ERROR: permission denied to LOAD from a file +DETAIL: Only roles with privileges of the "pg_read_server_files" role may LOAD from a file. +RESET ROLE; +-- Grant pg_read_server_files and try again - should fail on table permission now +GRANT pg_read_server_files TO load_test_user; +-- +-- Test 2: Table INSERT permission (ACL_INSERT) +-- +-- User has file read permission but no INSERT on the label table +SET ROLE load_test_user; +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +ERROR: permission denied for table Person1 +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); +ERROR: permission denied for table SecEdge +RESET ROLE; +-- Grant INSERT permission and try again - should succeed +GRANT USAGE ON SCHEMA agload_security TO load_test_user; +GRANT INSERT ON agload_security."Person1" TO load_test_user; +GRANT INSERT ON agload_security."SecEdge" TO load_test_user; +GRANT UPDATE ON SEQUENCE agload_security."Person1_id_seq" TO load_test_user; +GRANT UPDATE ON SEQUENCE agload_security."SecEdge_id_seq" TO load_test_user; +GRANT SELECT ON ag_catalog.ag_label TO load_test_user; +GRANT SELECT ON ag_catalog.ag_graph TO load_test_user; +SET ROLE load_test_user; +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); + load_labels_from_file +----------------------- + +(1 row) + +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); + load_edges_from_file +---------------------- + +(1 row) + +RESET ROLE; +-- Verify data was loaded +SELECT COUNT(*) FROM agload_security."Person1"; + count +------- + 6 +(1 row) + +SELECT COUNT(*) FROM agload_security."SecEdge"; + count +------- + 6 +(1 row) + +-- cleanup +DELETE FROM agload_security."Person1"; +DELETE FROM agload_security."SecEdge"; +-- +-- Test 3: Row-Level Security (RLS) +-- +-- Enable RLS on the label tables +ALTER TABLE agload_security."Person1" ENABLE ROW LEVEL SECURITY; +ALTER TABLE agload_security."SecEdge" ENABLE ROW LEVEL SECURITY; +-- Switch to load_test_user +SET ROLE load_test_user; +-- Loading should fail when RLS is enabled +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +ERROR: LOAD from file is not supported with row-level security +HINT: Use Cypher CREATE clause instead. +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); +ERROR: LOAD from file is not supported with row-level security +HINT: Use Cypher CREATE clause instead. +RESET ROLE; +-- Disable RLS and try again - should succeed +ALTER TABLE agload_security."Person1" DISABLE ROW LEVEL SECURITY; +ALTER TABLE agload_security."SecEdge" DISABLE ROW LEVEL SECURITY; +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); + load_labels_from_file +----------------------- + +(1 row) + +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); + load_edges_from_file +---------------------- + +(1 row) + +-- Verify data was loaded +SELECT COUNT(*) FROM agload_security."Person1"; + count +------- + 6 +(1 row) + +SELECT COUNT(*) FROM agload_security."SecEdge"; + count +------- + 6 +(1 row) + +-- cleanup +DELETE FROM agload_security."Person1"; +DELETE FROM agload_security."SecEdge"; +-- +-- Test 4: Constraint checking (CHECK constraint) +-- +-- Add constraint on vertex properties - fail if bool property is false +ALTER TABLE agload_security."Person1" ADD CONSTRAINT check_bool_true + CHECK ((properties->>'"bool"')::boolean = true); +-- This should fail - constraint violation +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +ERROR: new row for relation "Person1" violates check constraint "check_bool_true" +DETAIL: Failing row contains (844424930131970, {"id": "2", "bool": "false", "__id__": 2, "string": "John", "num...). +-- Add constraint on edge properties - fail if bool property is false +ALTER TABLE agload_security."SecEdge" ADD CONSTRAINT check_bool_true + CHECK ((properties->>'"bool"')::boolean = true); +-- This should fail - some edges have bool = false +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); +ERROR: new row for relation "SecEdge" violates check constraint "check_bool_true" +DETAIL: Failing row contains (1407374883553294, 844424930131969, 1125899906842625, {"bool": "false", "string": "John", "numeric": "-2"}). +-- cleanup +ALTER TABLE agload_security."Person1" DROP CONSTRAINT check_bool_true; +ALTER TABLE agload_security."SecEdge" DROP CONSTRAINT check_bool_true; +-- +-- Cleanup +-- +REVOKE ALL ON agload_security."Person1" FROM load_test_user; +REVOKE ALL ON agload_security."SecEdge" FROM load_test_user; +REVOKE ALL ON SEQUENCE agload_security."Person1_id_seq" FROM load_test_user; +REVOKE ALL ON SEQUENCE agload_security."SecEdge_id_seq" FROM load_test_user; +REVOKE ALL ON ag_catalog.ag_label FROM load_test_user; +REVOKE ALL ON ag_catalog.ag_graph FROM load_test_user; +REVOKE ALL ON SCHEMA agload_security FROM load_test_user; +REVOKE ALL ON SCHEMA ag_catalog FROM load_test_user; +REVOKE pg_read_server_files FROM load_test_user; +DROP USER load_test_user; +SELECT drop_graph('agload_security', true); +NOTICE: drop cascades to 5 other objects +DETAIL: drop cascades to table agload_security._ag_label_vertex +drop cascades to table agload_security._ag_label_edge +drop cascades to table agload_security."Person1" +drop cascades to table agload_security."Person2" +drop cascades to table agload_security."SecEdge" +NOTICE: graph "agload_security" has been dropped + drop_graph +------------ + +(1 row) + -- -- End -- diff --git a/regress/expected/index.out b/regress/expected/index.out index 745cab269..ec62bf57d 100644 --- a/regress/expected/index.out +++ b/regress/expected/index.out @@ -264,19 +264,19 @@ $$) as (n agtype); (0 rows) -- Verify that the incices are created on id columns -SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index'; +SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index' ORDER BY 1; indexname | indexdef -----------------------------+------------------------------------------------------------------------------------------------ + City_pkey | CREATE UNIQUE INDEX "City_pkey" ON cypher_index."City" USING btree (id) + Country_pkey | CREATE UNIQUE INDEX "Country_pkey" ON cypher_index."Country" USING btree (id) + _ag_label_edge_end_id_idx | CREATE INDEX _ag_label_edge_end_id_idx ON cypher_index._ag_label_edge USING btree (end_id) _ag_label_edge_pkey | CREATE UNIQUE INDEX _ag_label_edge_pkey ON cypher_index._ag_label_edge USING btree (id) _ag_label_edge_start_id_idx | CREATE INDEX _ag_label_edge_start_id_idx ON cypher_index._ag_label_edge USING btree (start_id) - _ag_label_edge_end_id_idx | CREATE INDEX _ag_label_edge_end_id_idx ON cypher_index._ag_label_edge USING btree (end_id) _ag_label_vertex_pkey | CREATE UNIQUE INDEX _ag_label_vertex_pkey ON cypher_index._ag_label_vertex USING btree (id) - idx_pkey | CREATE UNIQUE INDEX idx_pkey ON cypher_index.idx USING btree (id) cypher_index_idx_props_uq | CREATE UNIQUE INDEX cypher_index_idx_props_uq ON cypher_index.idx USING btree (properties) - Country_pkey | CREATE UNIQUE INDEX "Country_pkey" ON cypher_index."Country" USING btree (id) - has_city_start_id_idx | CREATE INDEX has_city_start_id_idx ON cypher_index.has_city USING btree (start_id) has_city_end_id_idx | CREATE INDEX has_city_end_id_idx ON cypher_index.has_city USING btree (end_id) - City_pkey | CREATE UNIQUE INDEX "City_pkey" ON cypher_index."City" USING btree (id) + has_city_start_id_idx | CREATE INDEX has_city_start_id_idx ON cypher_index.has_city USING btree (start_id) + idx_pkey | CREATE UNIQUE INDEX idx_pkey ON cypher_index.idx USING btree (id) (10 rows) SET enable_mergejoin = ON; diff --git a/regress/sql/age_load.sql b/regress/sql/age_load.sql index cefcfb4ca..976f050af 100644 --- a/regress/sql/age_load.sql +++ b/regress/sql/age_load.sql @@ -194,6 +194,131 @@ SELECT load_edges_from_file('agload_conversion', 'Edges1', '../../etc/passwd', t -- SELECT drop_graph('agload_conversion', true); +-- +-- Test security and permissions +-- + +SELECT create_graph('agload_security'); +SELECT create_vlabel('agload_security', 'Person1'); +SELECT create_vlabel('agload_security', 'Person2'); +SELECT create_elabel('agload_security', 'SecEdge'); + +-- +-- Test 1: File read permission (pg_read_server_files role) +-- +-- Create a user without pg_read_server_files role +CREATE USER load_test_user; +GRANT USAGE ON SCHEMA ag_catalog TO load_test_user; + +-- This should fail because load_test_user doesn't have pg_read_server_files +SET ROLE load_test_user; +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); +RESET ROLE; + +-- Grant pg_read_server_files and try again - should fail on table permission now +GRANT pg_read_server_files TO load_test_user; + +-- +-- Test 2: Table INSERT permission (ACL_INSERT) +-- +-- User has file read permission but no INSERT on the label table +SET ROLE load_test_user; +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); +RESET ROLE; + +-- Grant INSERT permission and try again - should succeed +GRANT USAGE ON SCHEMA agload_security TO load_test_user; +GRANT INSERT ON agload_security."Person1" TO load_test_user; +GRANT INSERT ON agload_security."SecEdge" TO load_test_user; +GRANT UPDATE ON SEQUENCE agload_security."Person1_id_seq" TO load_test_user; +GRANT UPDATE ON SEQUENCE agload_security."SecEdge_id_seq" TO load_test_user; +GRANT SELECT ON ag_catalog.ag_label TO load_test_user; +GRANT SELECT ON ag_catalog.ag_graph TO load_test_user; + +SET ROLE load_test_user; +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); +RESET ROLE; + +-- Verify data was loaded +SELECT COUNT(*) FROM agload_security."Person1"; +SELECT COUNT(*) FROM agload_security."SecEdge"; + +-- cleanup +DELETE FROM agload_security."Person1"; +DELETE FROM agload_security."SecEdge"; + +-- +-- Test 3: Row-Level Security (RLS) +-- + +-- Enable RLS on the label tables +ALTER TABLE agload_security."Person1" ENABLE ROW LEVEL SECURITY; +ALTER TABLE agload_security."SecEdge" ENABLE ROW LEVEL SECURITY; + +-- Switch to load_test_user +SET ROLE load_test_user; + +-- Loading should fail when RLS is enabled +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); + +RESET ROLE; + +-- Disable RLS and try again - should succeed +ALTER TABLE agload_security."Person1" DISABLE ROW LEVEL SECURITY; +ALTER TABLE agload_security."SecEdge" DISABLE ROW LEVEL SECURITY; + +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); + +-- Verify data was loaded +SELECT COUNT(*) FROM agload_security."Person1"; +SELECT COUNT(*) FROM agload_security."SecEdge"; + +-- cleanup +DELETE FROM agload_security."Person1"; +DELETE FROM agload_security."SecEdge"; + +-- +-- Test 4: Constraint checking (CHECK constraint) +-- + +-- Add constraint on vertex properties - fail if bool property is false +ALTER TABLE agload_security."Person1" ADD CONSTRAINT check_bool_true + CHECK ((properties->>'"bool"')::boolean = true); + +-- This should fail - constraint violation +SELECT load_labels_from_file('agload_security', 'Person1', 'age_load/conversion_vertices.csv', true); + +-- Add constraint on edge properties - fail if bool property is false +ALTER TABLE agload_security."SecEdge" ADD CONSTRAINT check_bool_true + CHECK ((properties->>'"bool"')::boolean = true); + +-- This should fail - some edges have bool = false +SELECT load_edges_from_file('agload_security', 'SecEdge', 'age_load/conversion_edges.csv'); + +-- cleanup +ALTER TABLE agload_security."Person1" DROP CONSTRAINT check_bool_true; +ALTER TABLE agload_security."SecEdge" DROP CONSTRAINT check_bool_true; + +-- +-- Cleanup +-- +REVOKE ALL ON agload_security."Person1" FROM load_test_user; +REVOKE ALL ON agload_security."SecEdge" FROM load_test_user; +REVOKE ALL ON SEQUENCE agload_security."Person1_id_seq" FROM load_test_user; +REVOKE ALL ON SEQUENCE agload_security."SecEdge_id_seq" FROM load_test_user; +REVOKE ALL ON ag_catalog.ag_label FROM load_test_user; +REVOKE ALL ON ag_catalog.ag_graph FROM load_test_user; +REVOKE ALL ON SCHEMA agload_security FROM load_test_user; +REVOKE ALL ON SCHEMA ag_catalog FROM load_test_user; +REVOKE pg_read_server_files FROM load_test_user; +DROP USER load_test_user; +SELECT drop_graph('agload_security', true); + -- -- End -- diff --git a/regress/sql/index.sql b/regress/sql/index.sql index a6e075c70..d4a4b24a4 100644 --- a/regress/sql/index.sql +++ b/regress/sql/index.sql @@ -165,7 +165,7 @@ SELECT * FROM cypher('cypher_index', $$ $$) as (n agtype); -- Verify that the incices are created on id columns -SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index'; +SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index' ORDER BY 1; SET enable_mergejoin = ON; SET enable_hashjoin = OFF; diff --git a/src/backend/utils/load/ag_load_edges.c b/src/backend/utils/load/ag_load_edges.c index 931c6e0dc..c05bf3352 100644 --- a/src/backend/utils/load/ag_load_edges.c +++ b/src/backend/utils/load/ag_load_edges.c @@ -16,50 +16,30 @@ * specific language governing permissions and limitations * under the License. */ - #include "postgres.h" -#include "utils/load/ag_load_edges.h" -#include "utils/load/csv.h" +#include "access/heapam.h" +#include "access/table.h" +#include "catalog/namespace.h" +#include "commands/copy.h" +#include "executor/executor.h" +#include "nodes/makefuncs.h" +#include "parser/parse_node.h" +#include "utils/memutils.h" +#include "utils/rel.h" -void edge_field_cb(void *field, size_t field_len, void *data) -{ - - csv_edge_reader *cr = (csv_edge_reader*)data; - if (cr->error) - { - cr->error = 1; - ereport(NOTICE,(errmsg("There is some unknown error"))); - } - - /* check for space to store this field */ - if (cr->cur_field == cr->alloc) - { - cr->alloc *= 2; - cr->fields = repalloc_check(cr->fields, sizeof(char *) * cr->alloc); - cr->fields_len = repalloc_check(cr->header, sizeof(size_t *) * cr->alloc); - if (cr->fields == NULL) - { - cr->error = 1; - ereport(ERROR, - (errmsg("field_cb: failed to reallocate %zu bytes\n", - sizeof(char *) * cr->alloc))); - } - } - cr->fields_len[cr->cur_field] = field_len; - cr->curr_row_length += field_len; - cr->fields[cr->cur_field] = pnstrdup((char*)field, field_len); - cr->cur_field += 1; -} +#include "utils/load/ag_load_edges.h" -/* Parser calls this function when it detects end of a row */ -void edge_row_cb(int delim __attribute__((unused)), void *data) +/* + * Process a single edge row from COPY's raw fields. + * Edge CSV format: start_id, start_vertex_type, end_id, end_vertex_type, [properties...] + */ +static void process_edge_row(char **fields, int nfields, + char **header, int header_count, + int label_id, Oid label_seq_relid, + Oid graph_oid, bool load_as_agtype, + batch_insert_state *batch_state) { - - csv_edge_reader *cr = (csv_edge_reader*)data; - batch_insert_state *batch_state = cr->batch_state; - - size_t i, n_fields; int64 start_id_int; graphid start_vertex_graph_id; int start_vertex_type_id; @@ -72,104 +52,92 @@ void edge_row_cb(int delim __attribute__((unused)), void *data) int64 entry_id; TupleTableSlot *slot; - n_fields = cr->cur_field; + char *start_vertex_type; + char *end_vertex_type; + agtype *edge_properties; - if (cr->row == 0) - { - cr->header_num = cr->cur_field; - cr->header_row_length = cr->curr_row_length; - cr->header_len = (size_t* )palloc(sizeof(size_t *) * cr->cur_field); - cr->header = palloc((sizeof (char*) * cr->cur_field)); + /* Generate edge ID */ + entry_id = nextval_internal(label_seq_relid, true); + edge_id = make_graphid(label_id, entry_id); - for (i = 0; icur_field; i++) - { - cr->header_len[i] = cr->fields_len[i]; - cr->header[i] = pnstrdup(cr->fields[i], cr->header_len[i]); - } - } - else - { - entry_id = nextval_internal(cr->label_seq_relid, true); - edge_id = make_graphid(cr->label_id, entry_id); - - start_id_int = strtol(cr->fields[0], NULL, 10); - start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid); - end_id_int = strtol(cr->fields[2], NULL, 10); - end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_oid); - - start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int); - end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int); - - /* Get the appropriate slot from the batch state */ - slot = batch_state->slots[batch_state->num_tuples]; - - /* Clear the slots contents */ - ExecClearTuple(slot); - - /* Fill the values in the slot */ - slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id); - slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id); - slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id); - slot->tts_values[3] = AGTYPE_P_GET_DATUM( - create_agtype_from_list_i( - cr->header, cr->fields, - n_fields, 4, cr->load_as_agtype)); - slot->tts_isnull[0] = false; - slot->tts_isnull[1] = false; - slot->tts_isnull[2] = false; - slot->tts_isnull[3] = false; - - /* Make the slot as containing virtual tuple */ - ExecStoreVirtualTuple(slot); - batch_state->num_tuples++; - - if (batch_state->num_tuples >= batch_state->max_tuples) - { - /* Insert the batch when it is full (i.e. BATCH_SIZE) */ - insert_batch(batch_state); - batch_state->num_tuples = 0; - } - } + /* Trim whitespace from vertex type names */ + start_vertex_type = trim_whitespace(fields[1]); + end_vertex_type = trim_whitespace(fields[3]); - for (i = 0; i < n_fields; ++i) - { - pfree_if_not_null(cr->fields[i]); - } + /* Parse start vertex info */ + start_id_int = strtol(fields[0], NULL, 10); + start_vertex_type_id = get_label_id(start_vertex_type, graph_oid); - if (cr->error) - { - ereport(NOTICE,(errmsg("THere is some error"))); - } + /* Parse end vertex info */ + end_id_int = strtol(fields[2], NULL, 10); + end_vertex_type_id = get_label_id(end_vertex_type, graph_oid); - cr->cur_field = 0; - cr->curr_row_length = 0; - cr->row += 1; -} + /* Create graphids for start and end vertices */ + start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int); + end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int); -static int is_space(unsigned char c) -{ - if (c == CSV_SPACE || c == CSV_TAB) - { - return 1; - } - else + /* Get the appropriate slot from the batch state */ + slot = batch_state->slots[batch_state->num_tuples]; + + /* Clear the slots contents */ + ExecClearTuple(slot); + + /* Build the agtype properties */ + edge_properties = create_agtype_from_list_i(header, fields, + nfields, 4, load_as_agtype); + + /* Fill the values in the slot */ + slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id); + slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id); + slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id); + slot->tts_values[3] = AGTYPE_P_GET_DATUM(edge_properties); + slot->tts_isnull[0] = false; + slot->tts_isnull[1] = false; + slot->tts_isnull[2] = false; + slot->tts_isnull[3] = false; + + /* Make the slot as containing virtual tuple */ + ExecStoreVirtualTuple(slot); + + batch_state->buffered_bytes += VARSIZE(edge_properties); + batch_state->num_tuples++; + + /* Insert the batch when tuple count OR byte threshold is reached */ + if (batch_state->num_tuples >= BATCH_SIZE || + batch_state->buffered_bytes >= MAX_BUFFERED_BYTES) { - return 0; + insert_batch(batch_state); + batch_state->num_tuples = 0; + batch_state->buffered_bytes = 0; } } -static int is_term(unsigned char c) +/* + * Create COPY options for CSV parsing. + * Returns a List of DefElem nodes. + */ +static List *create_copy_options(void) { - if (c == CSV_CR || c == CSV_LF) - { - return 1; - } - else - { - return 0; - } + List *options = NIL; + + /* FORMAT csv */ + options = lappend(options, + makeDefElem("format", + (Node *) makeString("csv"), + -1)); + + /* HEADER false - we'll read the header ourselves */ + options = lappend(options, + makeDefElem("header", + (Node *) makeBoolean(false), + -1)); + + return options; } +/* + * Load edges from CSV file using pg's COPY infrastructure. + */ int create_edges_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, @@ -177,79 +145,133 @@ int create_edges_from_csv_file(char *file_path, int label_id, bool load_as_agtype) { + Relation label_rel; + Oid label_relid; + CopyFromState cstate; + List *copy_options; + ParseState *pstate; + char **fields; + int nfields; + char **header = NULL; + int header_count = 0; + bool is_first_row = true; + char *label_seq_name; + Oid label_seq_relid; + batch_insert_state *batch_state = NULL; + MemoryContext batch_context; + MemoryContext old_context; + + /* Create a memory context for batch processing - reset after each batch */ + batch_context = AllocSetContextCreate(CurrentMemoryContext, + "AGE CSV Edge Load Batch Context", + ALLOCSET_DEFAULT_SIZES); + + /* Get the label relation */ + label_relid = get_label_relation(label_name, graph_oid); + label_rel = table_open(label_relid, RowExclusiveLock); + + /* Get sequence info */ + label_seq_name = get_label_seq_relation_name(label_name); + label_seq_relid = get_relname_relid(label_seq_name, graph_oid); + + /* Initialize the batch insert state */ + init_batch_insert(&batch_state, label_name, graph_oid); + + /* Create COPY options for CSV parsing */ + copy_options = create_copy_options(); + + /* Create a minimal ParseState for BeginCopyFrom */ + pstate = make_parsestate(NULL); - FILE *fp; - struct csv_parser p; - char buf[1024]; - size_t bytes_read; - unsigned char options = 0; - csv_edge_reader cr; - char *label_seq_name; - - if (csv_init(&p, options) != 0) + PG_TRY(); { - ereport(ERROR, - (errmsg("Failed to initialize csv parser\n"))); - } - - p.malloc_func = palloc; - p.realloc_func = repalloc_check; - p.free_func = pfree_if_not_null; + /* + * Initialize COPY FROM state. + * We pass the label relation but will only use NextCopyFromRawFields + * which returns raw parsed strings without type conversion. + */ + cstate = BeginCopyFrom(pstate, + label_rel, + NULL, /* whereClause */ + file_path, + false, /* is_program */ + NULL, /* data_source_cb */ + NIL, /* attnamelist */ + copy_options); + + /* + * Process rows using COPY's csv parsing. + * NextCopyFromRawFields uses 64KB buffers internally. + */ + while (NextCopyFromRawFields(cstate, &fields, &nfields)) + { + if (is_first_row) + { + int i; - csv_set_space_func(&p, is_space); - csv_set_term_func(&p, is_term); + /* First row is the header - save column names (in main context) */ + header_count = nfields; + header = (char **) palloc(sizeof(char *) * nfields); - fp = fopen(file_path, "rb"); - if (!fp) - { - ereport(ERROR, - (errmsg("Failed to open %s\n", file_path))); - } + for (i = 0; i < nfields; i++) + { + /* Trim whitespace from header fields */ + header[i] = trim_whitespace(fields[i]); + } - PG_TRY(); - { - label_seq_name = get_label_seq_relation_name(label_name); - - memset((void*)&cr, 0, sizeof(csv_edge_reader)); - cr.alloc = 128; - cr.fields = palloc(sizeof(char *) * cr.alloc); - cr.fields_len = palloc(sizeof(size_t *) * cr.alloc); - cr.header_row_length = 0; - cr.curr_row_length = 0; - cr.graph_name = graph_name; - cr.graph_oid = graph_oid; - cr.label_name = label_name; - cr.label_id = label_id; - cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid); - cr.load_as_agtype = load_as_agtype; - - /* Initialize the batch insert state */ - init_batch_insert(&cr.batch_state, label_name, graph_oid); - - while ((bytes_read=fread(buf, 1, 1024, fp)) > 0) - { - if (csv_parse(&p, buf, bytes_read, edge_field_cb, - edge_row_cb, &cr) != bytes_read) + is_first_row = false; + } + else { - ereport(ERROR, (errmsg("Error while parsing file: %s\n", - csv_strerror(csv_error(&p))))); + /* Switch to batch context for row processing */ + old_context = MemoryContextSwitchTo(batch_context); + + /* Data row - process it */ + process_edge_row(fields, nfields, + header, header_count, + label_id, label_seq_relid, + graph_oid, load_as_agtype, + batch_state); + + /* Switch back to main context */ + MemoryContextSwitchTo(old_context); + + /* Reset batch context after each batch to free memory */ + if (batch_state->num_tuples == 0) + { + MemoryContextReset(batch_context); + } } } - csv_fini(&p, edge_field_cb, edge_row_cb, &cr); - /* Finish any remaining batch inserts */ - finish_batch_insert(&cr.batch_state); + finish_batch_insert(&batch_state); + MemoryContextReset(batch_context); - if (ferror(fp)) - { - ereport(ERROR, (errmsg("Error while reading file %s\n", file_path))); - } + /* Clean up COPY state */ + EndCopyFrom(cstate); } PG_FINALLY(); { - fclose(fp); - csv_free(&p); + /* Free header if allocated */ + if (header != NULL) + { + int i; + for (i = 0; i < header_count; i++) + { + pfree(header[i]); + } + pfree(header); + } + + /* Close the relation */ + table_close(label_rel, RowExclusiveLock); + + /* Delete batch context */ + MemoryContextDelete(batch_context); + + /* Free parse state */ + free_parsestate(pstate); } PG_END_TRY(); diff --git a/src/backend/utils/load/ag_load_labels.c b/src/backend/utils/load/ag_load_labels.c index 1e86bbda4..5b11f68b8 100644 --- a/src/backend/utils/load/ag_load_labels.c +++ b/src/backend/utils/load/ag_load_labels.c @@ -17,155 +17,114 @@ * under the License. */ #include "postgres.h" -#include "executor/spi.h" + +#include "access/heapam.h" +#include "access/table.h" #include "catalog/namespace.h" +#include "commands/copy.h" #include "executor/executor.h" +#include "nodes/makefuncs.h" +#include "parser/parse_node.h" +#include "utils/memutils.h" +#include "utils/rel.h" #include "utils/load/ag_load_labels.h" -#include "utils/load/csv.h" - -void vertex_field_cb(void *field, size_t field_len, void *data) -{ - - csv_vertex_reader *cr = (csv_vertex_reader *) data; - - if (cr->error) - { - cr->error = 1; - ereport(NOTICE,(errmsg("There is some unknown error"))); - } - - /* check for space to store this field */ - if (cr->cur_field == cr->alloc) - { - cr->alloc *= 2; - cr->fields = repalloc_check(cr->fields, sizeof(char *) * cr->alloc); - cr->fields_len = repalloc_check(cr->header, sizeof(size_t *) * cr->alloc); - if (cr->fields == NULL) - { - cr->error = 1; - ereport(ERROR, - (errmsg("field_cb: failed to reallocate %zu bytes\n", - sizeof(char *) * cr->alloc))); - } - } - cr->fields_len[cr->cur_field] = field_len; - cr->curr_row_length += field_len; - cr->fields[cr->cur_field] = pnstrdup((char *) field, field_len); - cr->cur_field += 1; -} -void vertex_row_cb(int delim __attribute__((unused)), void *data) +/* + * Process a single vertex row from COPY's raw fields. + * Vertex CSV format: [id,] [properties...] + */ +static void process_vertex_row(char **fields, int nfields, + char **header, int header_count, + int label_id, Oid label_seq_relid, + bool id_field_exists, bool load_as_agtype, + int64 *curr_seq_num, + batch_insert_state *batch_state) { - csv_vertex_reader *cr = (csv_vertex_reader*)data; - batch_insert_state *batch_state = cr->batch_state; - size_t i, n_fields; graphid vertex_id; int64 entry_id; TupleTableSlot *slot; + agtype *vertex_properties; - n_fields = cr->cur_field; - - if (cr->row == 0) + /* Generate or use provided entry_id */ + if (id_field_exists) { - cr->header_num = cr->cur_field; - cr->header_row_length = cr->curr_row_length; - cr->header_len = (size_t* )palloc(sizeof(size_t *) * cr->cur_field); - cr->header = palloc((sizeof (char*) * cr->cur_field)); - - for (i = 0; icur_field; i++) + entry_id = strtol(fields[0], NULL, 10); + if (entry_id > *curr_seq_num) { - cr->header_len[i] = cr->fields_len[i]; - cr->header[i] = pnstrdup(cr->fields[i], cr->header_len[i]); + /* This is needed to ensure the sequence is up-to-date */ + DirectFunctionCall2(setval_oid, + ObjectIdGetDatum(label_seq_relid), + Int64GetDatum(entry_id)); + *curr_seq_num = entry_id; } } else { - if (cr->id_field_exists) - { - entry_id = strtol(cr->fields[0], NULL, 10); - if (entry_id > cr->curr_seq_num) - { - DirectFunctionCall2(setval_oid, - ObjectIdGetDatum(cr->label_seq_relid), - Int64GetDatum(entry_id)); - cr->curr_seq_num = entry_id; - } - } - else - { - entry_id = nextval_internal(cr->label_seq_relid, true); - } + entry_id = nextval_internal(label_seq_relid, true); + } - vertex_id = make_graphid(cr->label_id, entry_id); + vertex_id = make_graphid(label_id, entry_id); - /* Get the appropriate slot from the batch state */ - slot = batch_state->slots[batch_state->num_tuples]; + /* Get the appropriate slot from the batch state */ + slot = batch_state->slots[batch_state->num_tuples]; - /* Clear the slots contents */ - ExecClearTuple(slot); + /* Clear the slots contents */ + ExecClearTuple(slot); - /* Fill the values in the slot */ - slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id); - slot->tts_values[1] = AGTYPE_P_GET_DATUM( - create_agtype_from_list(cr->header, cr->fields, - n_fields, entry_id, - cr->load_as_agtype)); - slot->tts_isnull[0] = false; - slot->tts_isnull[1] = false; + /* Build the agtype properties */ + vertex_properties = create_agtype_from_list(header, fields, + nfields, entry_id, + load_as_agtype); - /* Make the slot as containing virtual tuple */ - ExecStoreVirtualTuple(slot); + /* Fill the values in the slot */ + slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id); + slot->tts_values[1] = AGTYPE_P_GET_DATUM(vertex_properties); + slot->tts_isnull[0] = false; + slot->tts_isnull[1] = false; - batch_state->num_tuples++; + /* Make the slot as containing virtual tuple */ + ExecStoreVirtualTuple(slot); - if (batch_state->num_tuples >= batch_state->max_tuples) - { - /* Insert the batch when it is full (i.e. BATCH_SIZE) */ - insert_batch(batch_state); - batch_state->num_tuples = 0; - } - } + batch_state->buffered_bytes += VARSIZE(vertex_properties); + batch_state->num_tuples++; - for (i = 0; i < n_fields; ++i) + /* Insert the batch when tuple count OR byte threshold is reached */ + if (batch_state->num_tuples >= BATCH_SIZE || + batch_state->buffered_bytes >= MAX_BUFFERED_BYTES) { - pfree_if_not_null(cr->fields[i]); + insert_batch(batch_state); + batch_state->num_tuples = 0; + batch_state->buffered_bytes = 0; } - - if (cr->error) - { - ereport(NOTICE,(errmsg("THere is some error"))); - } - - cr->cur_field = 0; - cr->curr_row_length = 0; - cr->row += 1; } -static int is_space(unsigned char c) +/* + * Create COPY options for csv parsing. + * Returns a List of DefElem nodes. + */ +static List *create_copy_options(void) { - if (c == CSV_SPACE || c == CSV_TAB) - { - return 1; - } - else - { - return 0; - } + List *options = NIL; -} -static int is_term(unsigned char c) -{ - if (c == CSV_CR || c == CSV_LF) - { - return 1; - } - else - { - return 0; - } + /* FORMAT csv */ + options = lappend(options, + makeDefElem("format", + (Node *) makeString("csv"), + -1)); + + /* HEADER false - we'll read the header ourselves */ + options = lappend(options, + makeDefElem("header", + (Node *) makeBoolean(false), + -1)); + + return options; } +/* + * Load vertex labels from csv file using pg's COPY infrastructure. + */ int create_labels_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, @@ -174,96 +133,146 @@ int create_labels_from_csv_file(char *file_path, bool id_field_exists, bool load_as_agtype) { - - FILE *fp; - struct csv_parser p; - char buf[1024]; - size_t bytes_read; - unsigned char options = 0; - csv_vertex_reader cr; - char *label_seq_name; - - if (csv_init(&p, options) != 0) + Relation label_rel; + Oid label_relid; + CopyFromState cstate; + List *copy_options; + ParseState *pstate; + char **fields; + int nfields; + char **header = NULL; + int header_count = 0; + bool is_first_row = true; + char *label_seq_name; + Oid label_seq_relid; + int64 curr_seq_num = 0; + batch_insert_state *batch_state = NULL; + MemoryContext batch_context; + MemoryContext old_context; + + /* Create a memory context for batch processing - reset after each batch */ + batch_context = AllocSetContextCreate(CurrentMemoryContext, + "AGE CSV Load Batch Context", + ALLOCSET_DEFAULT_SIZES); + + /* Get the label relation */ + label_relid = get_label_relation(label_name, graph_oid); + label_rel = table_open(label_relid, RowExclusiveLock); + + /* Get sequence info */ + label_seq_name = get_label_seq_relation_name(label_name); + label_seq_relid = get_relname_relid(label_seq_name, graph_oid); + + if (id_field_exists) { - ereport(ERROR, - (errmsg("Failed to initialize csv parser\n"))); + /* + * Set the curr_seq_num since we will need it to compare with + * incoming entry_id. + */ + curr_seq_num = nextval_internal(label_seq_relid, true); } - p.malloc_func = palloc; - p.realloc_func = repalloc_check; - p.free_func = pfree_if_not_null; + /* Initialize the batch insert state */ + init_batch_insert(&batch_state, label_name, graph_oid); - csv_set_space_func(&p, is_space); - csv_set_term_func(&p, is_term); + /* Create COPY options for CSV parsing */ + copy_options = create_copy_options(); - fp = fopen(file_path, "rb"); - if (!fp) - { - ereport(ERROR, - (errmsg("Failed to open %s\n", file_path))); - } + /* Create a minimal ParseState for BeginCopyFrom */ + pstate = make_parsestate(NULL); PG_TRY(); { - label_seq_name = get_label_seq_relation_name(label_name); - - memset((void*)&cr, 0, sizeof(csv_vertex_reader)); - - cr.alloc = 2048; - cr.fields = palloc(sizeof(char *) * cr.alloc); - cr.fields_len = palloc(sizeof(size_t *) * cr.alloc); - cr.header_row_length = 0; - cr.curr_row_length = 0; - cr.graph_name = graph_name; - cr.graph_oid = graph_oid; - cr.label_name = label_name; - cr.label_id = label_id; - cr.id_field_exists = id_field_exists; - cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid); - cr.load_as_agtype = load_as_agtype; - - if (cr.id_field_exists) + /* + * Initialize COPY FROM state. + * We pass the label relation but will only use NextCopyFromRawFields + * which returns raw parsed strings without type conversion. + */ + cstate = BeginCopyFrom(pstate, + label_rel, + NULL, /* whereClause */ + file_path, + false, /* is_program */ + NULL, /* data_source_cb */ + NIL, /* attnamelist - NULL means all columns */ + copy_options); + + /* + * Process rows using COPY's csv parsing. + * NextCopyFromRawFields uses 64KB buffers internally. + */ + while (NextCopyFromRawFields(cstate, &fields, &nfields)) { - /* - * Set the curr_seq_num since we will need it to compare with - * incoming entry_id. - * - * We cant use currval because it will error out if nextval was - * not called before in the session. - */ - cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true); - } + if (is_first_row) + { + int i; - /* Initialize the batch insert state */ - init_batch_insert(&cr.batch_state, label_name, graph_oid); + /* First row is the header - save column names (in main context) */ + header_count = nfields; + header = (char **) palloc(sizeof(char *) * nfields); - while ((bytes_read=fread(buf, 1, 1024, fp)) > 0) - { - if (csv_parse(&p, buf, bytes_read, vertex_field_cb, - vertex_row_cb, &cr) != bytes_read) + for (i = 0; i < nfields; i++) + { + /* Trim whitespace from header fields */ + header[i] = trim_whitespace(fields[i]); + } + + is_first_row = false; + } + else { - ereport(ERROR, (errmsg("Error while parsing file: %s\n", - csv_strerror(csv_error(&p))))); + /* Switch to batch context for row processing */ + old_context = MemoryContextSwitchTo(batch_context); + + /* Data row - process it */ + process_vertex_row(fields, nfields, + header, header_count, + label_id, label_seq_relid, + id_field_exists, load_as_agtype, + &curr_seq_num, + batch_state); + + /* Switch back to main context */ + MemoryContextSwitchTo(old_context); + + /* Reset batch context after each batch to free memory */ + if (batch_state->num_tuples == 0) + { + MemoryContextReset(batch_context); + } } } - csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr); - /* Finish any remaining batch inserts */ - finish_batch_insert(&cr.batch_state); + finish_batch_insert(&batch_state); + MemoryContextReset(batch_context); - if (ferror(fp)) - { - ereport(ERROR, (errmsg("Error while reading file %s\n", - file_path))); - } + /* Clean up COPY state */ + EndCopyFrom(cstate); } PG_FINALLY(); { - fclose(fp); - csv_free(&p); + /* Free header if allocated */ + if (header != NULL) + { + int i; + for (i = 0; i < header_count; i++) + { + pfree(header[i]); + } + pfree(header); + } + + /* Close the relation */ + table_close(label_rel, RowExclusiveLock); + + /* Delete batch context */ + MemoryContextDelete(batch_context); + + /* Free parse state */ + free_parsestate(pstate); } PG_END_TRY(); return EXIT_SUCCESS; -} \ No newline at end of file +} diff --git a/src/backend/utils/load/age_load.c b/src/backend/utils/load/age_load.c index c7cf0677f..f9634668c 100644 --- a/src/backend/utils/load/age_load.c +++ b/src/backend/utils/load/age_load.c @@ -18,24 +18,81 @@ */ #include "postgres.h" + +#include "access/heapam.h" +#include "access/table.h" +#include "access/tableam.h" +#include "access/xact.h" #include "catalog/indexing.h" +#include "catalog/pg_authid.h" #include "executor/executor.h" +#include "miscadmin.h" +#include "nodes/parsenodes.h" +#include "parser/parse_relation.h" +#include "utils/acl.h" #include "utils/json.h" +#include "utils/rel.h" +#include "utils/rls.h" #include "utils/load/ag_load_edges.h" #include "utils/load/ag_load_labels.h" #include "utils/load/age_load.h" -#include "utils/rel.h" static agtype_value *csv_value_to_agtype_value(char *csv_val); static Oid get_or_create_graph(const Name graph_name); static int32 get_or_create_label(Oid graph_oid, char *graph_name, char *label_name, char label_kind); static char *build_safe_filename(char *name); +static void check_file_read_permission(void); +static void check_table_permissions(Oid relid); +static void check_rls_for_load(Oid relid); #define AGE_BASE_CSV_DIRECTORY "/tmp/age/" #define AGE_CSV_FILE_EXTENSION ".csv" +/* + * Trim leading and trailing whitespace from a string. + * Returns a newly allocated string with whitespace removed. + * Returns empty string for NULL input. + */ +char *trim_whitespace(const char *str) +{ + const char *start; + const char *end; + size_t len; + + if (str == NULL) + { + return pstrdup(""); + } + + /* Find first non-whitespace character */ + start = str; + while (*start && (*start == ' ' || *start == '\t' || + *start == '\n' || *start == '\r')) + { + start++; + } + + /* If string is all whitespace, return empty string */ + if (*start == '\0') + { + return pstrdup(""); + } + + /* Find last non-whitespace character */ + end = str + strlen(str) - 1; + while (end > start && (*end == ' ' || *end == '\t' || + *end == '\n' || *end == '\r')) + { + end--; + } + + /* Copy the trimmed string */ + len = end - start + 1; + return pnstrdup(start, len); +} + static char *build_safe_filename(char *name) { int length; @@ -88,6 +145,51 @@ static char *build_safe_filename(char *name) return resolved; } +/* + * Check if the current user has permission to read server files. + * Only users with the pg_read_server_files role can load from files. + */ +static void check_file_read_permission(void) +{ + if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES)) + { + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to LOAD from a file"), + errdetail("Only roles with privileges of the \"%s\" role may LOAD from a file.", + "pg_read_server_files"))); + } +} + +/* + * Check if the current user has INSERT permission on the target table. + */ +static void check_table_permissions(Oid relid) +{ + AclResult aclresult; + + aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_INSERT); + if (aclresult != ACLCHECK_OK) + { + aclcheck_error(aclresult, OBJECT_TABLE, get_rel_name(relid)); + } +} + +/* + * Check if RLS is enabled on the target table. + * CSV loading is not supported with row-level security. + */ +static void check_rls_for_load(Oid relid) +{ + if (check_enable_rls(relid, InvalidOid, true) == RLS_ENABLED) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("LOAD from file is not supported with row-level security"), + errhint("Use Cypher CREATE clause instead."))); + } +} + agtype *create_empty_agtype(void) { agtype* out; @@ -118,6 +220,14 @@ static agtype_value *csv_value_to_agtype_value(char *csv_val) char *new_csv_val; agtype_value *res; + /* Handle NULL or empty input - return null agtype value */ + if (csv_val == NULL || csv_val[0] == '\0') + { + res = palloc(sizeof(agtype_value)); + res->type = AGTV_NULL; + return res; + } + if (!json_validate(cstring_to_text(csv_val), false, false)) { /* wrap the string with double-quote */ @@ -175,18 +285,40 @@ agtype *create_agtype_from_list(char **header, char **fields, size_t fields_len, for (i = 0; itype = AGTV_STRING; + value_agtype->val.string.len = 0; + value_agtype->val.string.val = pstrdup(""); + } + else + { + value_agtype = string_to_agtype_value(trimmed_value); + } } result.res = push_agtype_value(&result.parse_state, @@ -228,18 +360,40 @@ agtype* create_agtype_from_list_i(char **header, char **fields, for (i = start_index; i < fields_len; i++) { + char *trimmed_value; + + /* Skip empty header fields (e.g., from trailing commas) */ + if (header[i] == NULL || header[i][0] == '\0') + { + continue; + } + key_agtype = string_to_agtype_value(header[i]); result.res = push_agtype_value(&result.parse_state, WAGT_KEY, key_agtype); + /* Trim whitespace from field value */ + trimmed_value = trim_whitespace(fields[i]); + if (load_as_agtype) { - value_agtype = csv_value_to_agtype_value(fields[i]); + value_agtype = csv_value_to_agtype_value(trimmed_value); } else { - value_agtype = string_to_agtype_value(fields[i]); + /* Handle empty field values */ + if (trimmed_value[0] == '\0') + { + value_agtype = palloc(sizeof(agtype_value)); + value_agtype->type = AGTV_STRING; + value_agtype->val.string.len = 0; + value_agtype->val.string.val = pstrdup(""); + } + else + { + value_agtype = string_to_agtype_value(trimmed_value); + } } result.res = push_agtype_value(&result.parse_state, @@ -362,11 +516,24 @@ void insert_batch(batch_insert_state *batch_state) List *result; int i; + /* Check constraints for each tuple before inserting */ + if (batch_state->resultRelInfo->ri_RelationDesc->rd_att->constr) + { + for (i = 0; i < batch_state->num_tuples; i++) + { + ExecConstraints(batch_state->resultRelInfo, + batch_state->slots[i], + batch_state->estate); + } + } + /* Insert the tuples */ heap_multi_insert(batch_state->resultRelInfo->ri_RelationDesc, batch_state->slots, batch_state->num_tuples, - GetCurrentCommandId(true), 0, NULL); - + GetCurrentCommandId(true), + TABLE_INSERT_SKIP_FSM, /* Skip free space map for bulk */ + batch_state->bistate); /* Use bulk insert state */ + /* Insert index entries for the tuples */ if (batch_state->resultRelInfo->ri_NumIndices > 0) { @@ -405,6 +572,7 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS) char* label_name_str; char* file_path_str; Oid graph_oid; + Oid label_relid; int32 label_id; bool id_field_exists; bool load_as_agtype; @@ -427,6 +595,9 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS) errmsg("file path must not be NULL"))); } + /* Check file read permission first */ + check_file_read_permission(); + graph_name = PG_GETARG_NAME(0); label_name = PG_GETARG_NAME(1); file_name = PG_GETARG_TEXT_P(2); @@ -447,6 +618,11 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS) label_id = get_or_create_label(graph_oid, graph_name_str, label_name_str, LABEL_KIND_VERTEX); + /* Get the label relation and check permissions */ + label_relid = get_label_relation(label_name_str, graph_oid); + check_table_permissions(label_relid); + check_rls_for_load(label_relid); + create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid, label_name_str, label_id, id_field_exists, load_as_agtype); @@ -459,7 +635,6 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS) PG_FUNCTION_INFO_V1(load_edges_from_file); Datum load_edges_from_file(PG_FUNCTION_ARGS) { - Name graph_name; Name label_name; text* file_name; @@ -467,6 +642,7 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS) char* label_name_str; char* file_path_str; Oid graph_oid; + Oid label_relid; int32 label_id; bool load_as_agtype; @@ -488,6 +664,9 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS) errmsg("file path must not be NULL"))); } + /* Check file read permission first */ + check_file_read_permission(); + graph_name = PG_GETARG_NAME(0); label_name = PG_GETARG_NAME(1); file_name = PG_GETARG_TEXT_P(2); @@ -507,6 +686,11 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS) label_id = get_or_create_label(graph_oid, graph_name_str, label_name_str, LABEL_KIND_EDGE); + /* Get the label relation and check permissions */ + label_relid = get_label_relation(label_name_str, graph_oid); + check_table_permissions(label_relid); + check_rls_for_load(label_relid); + create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid, label_name_str, label_id, load_as_agtype); @@ -597,19 +781,42 @@ void init_batch_insert(batch_insert_state **batch_state, Oid relid; EState *estate; ResultRelInfo *resultRelInfo; + RangeTblEntry *rte; + RTEPermissionInfo *perminfo; + List *range_table = NIL; + List *perminfos = NIL; int i; - /* Open the relation */ + /* Get the relation OID */ relid = get_label_relation(label_name, graph_oid); - relation = table_open(relid, RowExclusiveLock); /* Initialize executor state */ estate = CreateExecutorState(); - /* Initialize resultRelInfo */ + /* Create range table entry for ExecConstraints */ + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = relid; + rte->relkind = RELKIND_RELATION; + rte->rellockmode = RowExclusiveLock; + rte->perminfoindex = 1; + range_table = list_make1(rte); + + /* Create permission info */ + perminfo = makeNode(RTEPermissionInfo); + perminfo->relid = relid; + perminfo->requiredPerms = ACL_INSERT; + perminfos = list_make1(perminfo); + + /* Initialize range table in executor state */ + ExecInitRangeTable(estate, range_table, perminfos); + + /* Initialize resultRelInfo - this opens the relation */ resultRelInfo = makeNode(ResultRelInfo); - InitResultRelInfo(resultRelInfo, relation, 1, NULL, estate->es_instrument); - estate->es_result_relations = &resultRelInfo; + ExecInitResultRelation(estate, resultRelInfo, 1); + + /* Get relation from resultRelInfo (opened by ExecInitResultRelation) */ + relation = resultRelInfo->ri_RelationDesc; /* Open the indices */ ExecOpenIndices(resultRelInfo, false); @@ -619,8 +826,9 @@ void init_batch_insert(batch_insert_state **batch_state, (*batch_state)->slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE); (*batch_state)->estate = estate; (*batch_state)->resultRelInfo = resultRelInfo; - (*batch_state)->max_tuples = BATCH_SIZE; (*batch_state)->num_tuples = 0; + (*batch_state)->buffered_bytes = 0; + (*batch_state)->bistate = GetBulkInsertState(); /* Create slots */ for (i = 0; i < BATCH_SIZE; i++) @@ -651,12 +859,14 @@ void finish_batch_insert(batch_insert_state **batch_state) ExecDropSingleTupleTableSlot((*batch_state)->slots[i]); } - /* Clean up, close the indices and relation */ - ExecCloseIndices((*batch_state)->resultRelInfo); - table_close((*batch_state)->resultRelInfo->ri_RelationDesc, - RowExclusiveLock); + /* Free BulkInsertState */ + FreeBulkInsertState((*batch_state)->bistate); + + /* Close result relations and range table relations */ + ExecCloseResultRelations((*batch_state)->estate); + ExecCloseRangeTableRelations((*batch_state)->estate); - /* Clean up batch state */ + /* Clean up executor state */ FreeExecutorState((*batch_state)->estate); pfree((*batch_state)->slots); pfree(*batch_state); diff --git a/src/backend/utils/load/libcsv.c b/src/backend/utils/load/libcsv.c deleted file mode 100644 index f0e8b46be..000000000 --- a/src/backend/utils/load/libcsv.c +++ /dev/null @@ -1,549 +0,0 @@ -/* -libcsv - parse and write csv data -Copyright (C) 2008 Robert Gamble - -This library is free software; you can redistribute it and/or -modify it under the terms of the GNU Lesser General Public -License as published by the Free Software Foundation; either -version 2.1 of the License, or (at your option) any later version. - -This library is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -Lesser General Public License for more details. - -You should have received a copy of the GNU Lesser General Public -License along with this library; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -*/ - -#include - -#if __STDC_VERSION__ >= 199901L -# include -#else - /* C89 doesn't have stdint.h or SIZE_MAX */ -# define SIZE_MAX ((size_t)-1) -#endif - -#include "utils/load/csv.h" - -#define VERSION "3.0.3" - -#define ROW_NOT_BEGUN 0 -#define FIELD_NOT_BEGUN 1 -#define FIELD_BEGUN 2 -#define FIELD_MIGHT_HAVE_ENDED 3 - -/* - Explanation of states - ROW_NOT_BEGUN There have not been any fields encountered for this row - FIELD_NOT_BEGUN There have been fields but we are currently not in one - FIELD_BEGUN We are in a field - FIELD_MIGHT_HAVE_ENDED - We encountered a double quote inside a quoted field, the - field is either ended or the quote is literal -*/ - -#define MEM_BLK_SIZE 128 - -#define SUBMIT_FIELD(p) \ - do { \ - if (!quoted) \ - entry_pos -= spaces; \ - if (p->options & CSV_APPEND_NULL) \ - ((p)->entry_buf[entry_pos]) = '\0'; \ - if (cb1 && (p->options & CSV_EMPTY_IS_NULL) && !quoted && entry_pos == 0) \ - cb1(NULL, entry_pos, data); \ - else if (cb1) \ - cb1(p->entry_buf, entry_pos, data); \ - pstate = FIELD_NOT_BEGUN; \ - entry_pos = quoted = spaces = 0; \ - } while (0) - -#define SUBMIT_ROW(p, c) \ - do { \ - if (cb2) \ - cb2(c, data); \ - pstate = ROW_NOT_BEGUN; \ - entry_pos = quoted = spaces = 0; \ - } while (0) - -#define SUBMIT_CHAR(p, c) ((p)->entry_buf[entry_pos++] = (c)) - -static const char *csv_errors[] = {"success", - "error parsing data while strict checking enabled", - "memory exhausted while increasing buffer size", - "data size too large", - "invalid status code"}; - -int -csv_error(const struct csv_parser *p) -{ - assert(p && "received null csv_parser"); - - /* Return the current status of the parser */ - return p->status; -} - -const char * -csv_strerror(int status) -{ - /* Return a textual description of status */ - if (status >= CSV_EINVALID || status < 0) - return csv_errors[CSV_EINVALID]; - else - return csv_errors[status]; -} - -int -csv_get_opts(const struct csv_parser *p) -{ - /* Return the currently set options of parser */ - if (p == NULL) - return -1; - - return p->options; -} - -int -csv_set_opts(struct csv_parser *p, unsigned char options) -{ - /* Set the options */ - if (p == NULL) - return -1; - - p->options = options; - return 0; -} - -int -csv_init(struct csv_parser *p, unsigned char options) -{ - /* Initialize a csv_parser object returns 0 on success, -1 on error */ - if (p == NULL) - return -1; - - p->entry_buf = NULL; - p->pstate = ROW_NOT_BEGUN; - p->quoted = 0; - p->spaces = 0; - p->entry_pos = 0; - p->entry_size = 0; - p->status = 0; - p->options = options; - p->quote_char = CSV_QUOTE; - p->delim_char = CSV_COMMA; - p->is_space = NULL; - p->is_term = NULL; - p->blk_size = MEM_BLK_SIZE; - p->malloc_func = NULL; - p->realloc_func = realloc; - p->free_func = free; - - return 0; -} - -void -csv_free(struct csv_parser *p) -{ - /* Free the entry_buffer of csv_parser object */ - if (p == NULL) - return; - - if (p->entry_buf && p->free_func) - p->free_func(p->entry_buf); - - p->entry_buf = NULL; - p->entry_size = 0; - - return; -} - -int -csv_fini(struct csv_parser *p, void (*cb1)(void *, size_t, void *), void (*cb2)(int c, void *), void *data) -{ - int quoted; - int pstate; - size_t spaces; - size_t entry_pos; - - if (p == NULL) - return -1; - - /* Finalize parsing. Needed, for example, when file does not end in a newline */ - quoted = p->quoted; - pstate = p->pstate; - spaces = p->spaces; - entry_pos = p->entry_pos; - - if ((pstate == FIELD_BEGUN) && p->quoted && (p->options & CSV_STRICT) && (p->options & CSV_STRICT_FINI)) { - /* Current field is quoted, no end-quote was seen, and CSV_STRICT_FINI is set */ - p->status = CSV_EPARSE; - return -1; - } - - switch (pstate) { - case FIELD_MIGHT_HAVE_ENDED: - p->entry_pos -= p->spaces + 1; /* get rid of spaces and original quote */ - entry_pos = p->entry_pos; - /*lint -fallthrough */ - case FIELD_NOT_BEGUN: - case FIELD_BEGUN: - /* Unnecessary: - quoted = p->quoted, pstate = p->pstate; - spaces = p->spaces, entry_pos = p->entry_pos; - */ - SUBMIT_FIELD(p); - SUBMIT_ROW(p, -1); - break; - case ROW_NOT_BEGUN: /* Already ended properly */ - ; - } - - /* Reset parser */ - p->spaces = p->quoted = p->entry_pos = p->status = 0; - p->pstate = ROW_NOT_BEGUN; - - return 0; -} - -void -csv_set_delim(struct csv_parser *p, unsigned char c) -{ - /* Set the delimiter */ - if (p) p->delim_char = c; -} - -void -csv_set_quote(struct csv_parser *p, unsigned char c) -{ - /* Set the quote character */ - if (p) p->quote_char = c; -} - -unsigned char -csv_get_delim(const struct csv_parser *p) -{ - assert(p && "received null csv_parser"); - - /* Get the delimiter */ - return p->delim_char; -} - -unsigned char -csv_get_quote(const struct csv_parser *p) -{ - assert(p && "received null csv_parser"); - - /* Get the quote character */ - return p->quote_char; -} - -void -csv_set_space_func(struct csv_parser *p, int (*f)(unsigned char)) -{ - /* Set the space function */ - if (p) p->is_space = f; -} - -void -csv_set_term_func(struct csv_parser *p, int (*f)(unsigned char)) -{ - /* Set the term function */ - if (p) p->is_term = f; -} - -void -csv_set_realloc_func(struct csv_parser *p, void *(*f)(void *, size_t)) -{ - /* Set the realloc function used to increase buffer size */ - if (p && f) p->realloc_func = f; -} - -void -csv_set_free_func(struct csv_parser *p, void (*f)(void *)) -{ - /* Set the free function used to free the buffer */ - if (p && f) p->free_func = f; -} - -void -csv_set_blk_size(struct csv_parser *p, size_t size) -{ - /* Set the block size used to increment buffer size */ - if (p) p->blk_size = size; -} - -size_t -csv_get_buffer_size(const struct csv_parser *p) -{ - /* Get the size of the entry buffer */ - if (p) - return p->entry_size; - return 0; -} - -static int -csv_increase_buffer(struct csv_parser *p) -{ - size_t to_add; - void *vp; - - if (p == NULL) return 0; - if (p->realloc_func == NULL) return 0; - - /* Increase the size of the entry buffer. Attempt to increase size by - * p->blk_size, if this is larger than SIZE_MAX try to increase current - * buffer size to SIZE_MAX. If allocation fails, try to allocate halve - * the size and try again until successful or increment size is zero. - */ - - to_add = p->blk_size; - - if ( p->entry_size >= SIZE_MAX - to_add ) - to_add = SIZE_MAX - p->entry_size; - - if (!to_add) { - p->status = CSV_ETOOBIG; - return -1; - } - - while ((vp = p->realloc_func(p->entry_buf, p->entry_size + to_add)) == NULL) { - to_add /= 2; - if (!to_add) { - p->status = CSV_ENOMEM; - return -1; - } - } - - /* Update entry buffer pointer and entry_size if successful */ - p->entry_buf = vp; - p->entry_size += to_add; - return 0; -} - -size_t -csv_parse(struct csv_parser *p, const void *s, size_t len, void (*cb1)(void *, size_t, void *), void (*cb2)(int c, void *), void *data) -{ - unsigned const char *us = s; /* Access input data as array of unsigned char */ - unsigned char c; /* The character we are currently processing */ - size_t pos = 0; /* The number of characters we have processed in this call */ - - /* Store key fields into local variables for performance */ - unsigned char delim = p->delim_char; - unsigned char quote = p->quote_char; - int (*is_space)(unsigned char) = p->is_space; - int (*is_term)(unsigned char) = p->is_term; - int quoted = p->quoted; - int pstate = p->pstate; - size_t spaces = p->spaces; - size_t entry_pos = p->entry_pos; - - - if (!p->entry_buf && pos < len) { - /* Buffer hasn't been allocated yet and len > 0 */ - if (csv_increase_buffer(p) != 0) { - p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos = entry_pos; - return pos; - } - } - - while (pos < len) { - /* Check memory usage, increase buffer if necessary */ - if (entry_pos == ((p->options & CSV_APPEND_NULL) ? p->entry_size - 1 : p->entry_size) ) { - if (csv_increase_buffer(p) != 0) { - p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos = entry_pos; - return pos; - } - } - - c = us[pos++]; - - switch (pstate) { - case ROW_NOT_BEGUN: - case FIELD_NOT_BEGUN: - if ((is_space ? is_space(c) : c == CSV_SPACE || c == CSV_TAB) && c!=delim) { /* Space or Tab */ - continue; - } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) { /* Carriage Return or Line Feed */ - if (pstate == FIELD_NOT_BEGUN) { - SUBMIT_FIELD(p); - SUBMIT_ROW(p, c); - } else { /* ROW_NOT_BEGUN */ - /* Don't submit empty rows by default */ - if (p->options & CSV_REPALL_NL) { - SUBMIT_ROW(p, c); - } - } - continue; - } else if (c == delim) { /* Comma */ - SUBMIT_FIELD(p); - break; - } else if (c == quote) { /* Quote */ - pstate = FIELD_BEGUN; - quoted = 1; - } else { /* Anything else */ - pstate = FIELD_BEGUN; - quoted = 0; - SUBMIT_CHAR(p, c); - } - break; - case FIELD_BEGUN: - if (c == quote) { /* Quote */ - if (quoted) { - SUBMIT_CHAR(p, c); - pstate = FIELD_MIGHT_HAVE_ENDED; - } else { - /* STRICT ERROR - double quote inside non-quoted field */ - if (p->options & CSV_STRICT) { - p->status = CSV_EPARSE; - p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos = entry_pos; - return pos-1; - } - SUBMIT_CHAR(p, c); - spaces = 0; - } - } else if (c == delim) { /* Comma */ - if (quoted) { - SUBMIT_CHAR(p, c); - } else { - SUBMIT_FIELD(p); - } - } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) { /* Carriage Return or Line Feed */ - if (!quoted) { - SUBMIT_FIELD(p); - SUBMIT_ROW(p, c); - } else { - SUBMIT_CHAR(p, c); - } - } else if (!quoted && (is_space? is_space(c) : c == CSV_SPACE || c == CSV_TAB)) { /* Tab or space for non-quoted field */ - SUBMIT_CHAR(p, c); - spaces++; - } else { /* Anything else */ - SUBMIT_CHAR(p, c); - spaces = 0; - } - break; - case FIELD_MIGHT_HAVE_ENDED: - /* This only happens when a quote character is encountered in a quoted field */ - if (c == delim) { /* Comma */ - entry_pos -= spaces + 1; /* get rid of spaces and original quote */ - SUBMIT_FIELD(p); - } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) { /* Carriage Return or Line Feed */ - entry_pos -= spaces + 1; /* get rid of spaces and original quote */ - SUBMIT_FIELD(p); - SUBMIT_ROW(p, c); - } else if (is_space ? is_space(c) : c == CSV_SPACE || c == CSV_TAB) { /* Space or Tab */ - SUBMIT_CHAR(p, c); - spaces++; - } else if (c == quote) { /* Quote */ - if (spaces) { - /* STRICT ERROR - unescaped double quote */ - if (p->options & CSV_STRICT) { - p->status = CSV_EPARSE; - p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos = entry_pos; - return pos-1; - } - spaces = 0; - SUBMIT_CHAR(p, c); - } else { - /* Two quotes in a row */ - pstate = FIELD_BEGUN; - } - } else { /* Anything else */ - /* STRICT ERROR - unescaped double quote */ - if (p->options & CSV_STRICT) { - p->status = CSV_EPARSE; - p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos = entry_pos; - return pos-1; - } - pstate = FIELD_BEGUN; - spaces = 0; - SUBMIT_CHAR(p, c); - } - break; - default: - break; - } - } - p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos = entry_pos; - return pos; -} - -size_t -csv_write (void *dest, size_t dest_size, const void *src, size_t src_size) -{ - return csv_write2(dest, dest_size, src, src_size, CSV_QUOTE); -} - -int -csv_fwrite (FILE *fp, const void *src, size_t src_size) -{ - return csv_fwrite2(fp, src, src_size, CSV_QUOTE); -} - -size_t -csv_write2 (void *dest, size_t dest_size, const void *src, size_t src_size, unsigned char quote) -{ - unsigned char *cdest = dest; - const unsigned char *csrc = src; - size_t chars = 0; - - if (src == NULL) - return 0; - - if (dest == NULL) - dest_size = 0; - - if (dest_size > 0) - *cdest++ = quote; - chars++; - - while (src_size) { - if (*csrc == quote) { - if (dest_size > chars) - *cdest++ = quote; - if (chars < SIZE_MAX) chars++; - } - if (dest_size > chars) - *cdest++ = *csrc; - if (chars < SIZE_MAX) chars++; - src_size--; - csrc++; - } - - if (dest_size > chars) - *cdest = quote; - if (chars < SIZE_MAX) chars++; - - return chars; -} - -int -csv_fwrite2 (FILE *fp, const void *src, size_t src_size, unsigned char quote) -{ - const unsigned char *csrc = src; - - if (fp == NULL || src == NULL) - return 0; - - if (fputc(quote, fp) == EOF) - return EOF; - - while (src_size) { - if (*csrc == quote) { - if (fputc(quote, fp) == EOF) - return EOF; - } - if (fputc(*csrc, fp) == EOF) - return EOF; - src_size--; - csrc++; - } - - if (fputc(quote, fp) == EOF) { - return EOF; - } - - return 0; -} diff --git a/src/include/utils/load/ag_load_edges.h b/src/include/utils/load/ag_load_edges.h index df663b1dd..4db00d93a 100644 --- a/src/include/utils/load/ag_load_edges.h +++ b/src/include/utils/load/ag_load_edges.h @@ -17,42 +17,28 @@ * under the License. */ -#include "access/heapam.h" -#include "utils/load/age_load.h" - #ifndef AG_LOAD_EDGES_H #define AG_LOAD_EDGES_H -typedef struct { - size_t row; - char **header; - size_t *header_len; - size_t header_num; - char **fields; - size_t *fields_len; - size_t alloc; - size_t cur_field; - int error; - size_t header_row_length; - size_t curr_row_length; - char *graph_name; - Oid graph_oid; - char *label_name; - int label_id; - Oid label_seq_relid; - char *start_vertex; - char *end_vertex; - bool load_as_agtype; - batch_insert_state *batch_state; -} csv_edge_reader; - - -void edge_field_cb(void *field, size_t field_len, void *data); -void edge_row_cb(int delim __attribute__((unused)), void *data); +#include "utils/load/age_load.h" +/* + * Load edges from a CSV file using pg's COPY infrastructure. + * + * CSV format: start_id, start_vertex_type, end_id, end_vertex_type, [properties...] + * + * Parameters: + * file_path - Path to the CSV file (must be in /tmp/age/) + * graph_name - Name of the graph + * graph_oid - OID of the graph + * label_name - Name of the edge label + * label_id - ID of the label + * load_as_agtype - If true, parse CSV values as agtype (JSON-like) + * + * Returns EXIT_SUCCESS on success. + */ int create_edges_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, - char *label_name, int label_id, - bool load_as_agtype); - -#endif /*AG_LOAD_EDGES_H */ + char *label_name, int label_id, + bool load_as_agtype); +#endif /* AG_LOAD_EDGES_H */ diff --git a/src/include/utils/load/ag_load_labels.h b/src/include/utils/load/ag_load_labels.h index b8ed1572e..c3d517f30 100644 --- a/src/include/utils/load/ag_load_labels.h +++ b/src/include/utils/load/ag_load_labels.h @@ -17,46 +17,26 @@ * under the License. */ - #ifndef AG_LOAD_LABELS_H #define AG_LOAD_LABELS_H -#include "access/heapam.h" #include "utils/load/age_load.h" -struct counts { - long unsigned fields; - long unsigned allvalues; - long unsigned rows; -}; - -typedef struct { - size_t row; - char **header; - size_t *header_len; - size_t header_num; - char **fields; - size_t *fields_len; - size_t alloc; - size_t cur_field; - int error; - size_t header_row_length; - size_t curr_row_length; - char *graph_name; - Oid graph_oid; - char *label_name; - int label_id; - Oid label_seq_relid; - bool id_field_exists; - bool load_as_agtype; - int curr_seq_num; - batch_insert_state *batch_state; -} csv_vertex_reader; - - -void vertex_field_cb(void *field, size_t field_len, void *data); -void vertex_row_cb(int delim __attribute__((unused)), void *data); - +/* + * Load vertex labels from a CSV file using pg's COPY infrastructure. + * CSV format: [id,] [properties...] + * + * Parameters: + * file_path - Path to the CSV file (must be in /tmp/age/) + * graph_name - Name of the graph + * graph_oid - OID of the graph + * label_name - Name of the vertex label + * label_id - ID of the label + * id_field_exists - If true, first CSV column contains the vertex ID + * load_as_agtype - If true, parse CSV values as agtype (JSON-like) + * + * Returns EXIT_SUCCESS on success. + */ int create_labels_from_csv_file(char *file_path, char *graph_name, Oid graph_oid, char *label_name, int label_id, bool id_field_exists, bool load_as_agtype); diff --git a/src/include/utils/load/age_load.h b/src/include/utils/load/age_load.h index 72f11493d..6573c79f3 100644 --- a/src/include/utils/load/age_load.h +++ b/src/include/utils/load/age_load.h @@ -17,6 +17,10 @@ * under the License. */ +#ifndef AG_LOAD_H +#define AG_LOAD_H + +#include "access/heapam.h" #include "commands/sequence.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -27,10 +31,8 @@ #include "commands/graph_commands.h" #include "utils/ag_cache.h" -#ifndef AGE_ENTITY_CREATOR_H -#define AGE_ENTITY_CREATOR_H - #define BATCH_SIZE 1000 +#define MAX_BUFFERED_BYTES 65535 /* 64KB, same as pg COPY */ typedef struct batch_insert_state { @@ -38,26 +40,29 @@ typedef struct batch_insert_state ResultRelInfo *resultRelInfo; TupleTableSlot **slots; int num_tuples; - int max_tuples; + size_t buffered_bytes; + BulkInsertState bistate; } batch_insert_state; -agtype* create_empty_agtype(void); - -agtype* create_agtype_from_list(char **header, char **fields, +agtype *create_empty_agtype(void); +agtype *create_agtype_from_list(char **header, char **fields, size_t fields_len, int64 vertex_id, bool load_as_agtype); -agtype* create_agtype_from_list_i(char **header, char **fields, +agtype *create_agtype_from_list_i(char **header, char **fields, size_t fields_len, size_t start_index, bool load_as_agtype); + void insert_vertex_simple(Oid graph_oid, char *label_name, graphid vertex_id, agtype *vertex_properties); void insert_edge_simple(Oid graph_oid, char *label_name, graphid edge_id, graphid start_id, graphid end_id, - agtype* end_properties); -void insert_batch(batch_insert_state *batch_state); + agtype *edge_properties); void init_batch_insert(batch_insert_state **batch_state, char *label_name, Oid graph_oid); +void insert_batch(batch_insert_state *batch_state); void finish_batch_insert(batch_insert_state **batch_state); -#endif /* AGE_ENTITY_CREATOR_H */ +char *trim_whitespace(const char *str); + +#endif /* AG_LOAD_H */ diff --git a/src/include/utils/load/csv.h b/src/include/utils/load/csv.h deleted file mode 100644 index 062536977..000000000 --- a/src/include/utils/load/csv.h +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Created by Shoaib on 12/5/2021. -*/ - -/* -libcsv - parse and write csv data -Copyright (C) 2008-2021 Robert Gamble -This library is free software; you can redistribute it and/or -modify it under the terms of the GNU Lesser General Public -License as published by the Free Software Foundation; either -version 2.1 of the License, or (at your option) any later version. -This library is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -Lesser General Public License for more details. -You should have received a copy of the GNU Lesser General Public -License along with this library; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -*/ - -#ifndef LIBCSV_H__ -#define LIBCSV_H__ -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -#define CSV_MAJOR 3 -#define CSV_MINOR 0 -#define CSV_RELEASE 3 - -/* Error Codes */ -#define CSV_SUCCESS 0 -#define CSV_EPARSE 1 /* Parse error in strict mode */ -#define CSV_ENOMEM 2 /* Out of memory while increasing buffer size */ -#define CSV_ETOOBIG 3 /* Buffer larger than SIZE_MAX needed */ -#define CSV_EINVALID 4 /* Invalid code,should never be received from csv_error*/ - - -/* parser options */ -#define CSV_STRICT 1 /* enable strict mode */ -#define CSV_REPALL_NL 2 /* report all unquoted carriage returns and linefeeds */ -#define CSV_STRICT_FINI 4 /* causes csv_fini to return CSV_EPARSE if last - field is quoted and doesn't contain ending - quote */ -#define CSV_APPEND_NULL 8 /* Ensure that all fields are null-terminated */ -#define CSV_EMPTY_IS_NULL 16 /* Pass null pointer to cb1 function when - empty, unquoted fields are encountered */ - - -/* Character values */ -#define CSV_TAB 0x09 -#define CSV_SPACE 0x20 -#define CSV_CR 0x0d -#define CSV_LF 0x0a -#define CSV_COMMA 0x2c -#define CSV_QUOTE 0x22 - -struct csv_parser { - int pstate; /* Parser state */ - int quoted; /* Is the current field a quoted field? */ - size_t spaces; /* Number of continuous spaces after quote or in a non-quoted field */ - unsigned char * entry_buf; /* Entry buffer */ - size_t entry_pos; /* Current position in entry_buf (and current size of entry) */ - size_t entry_size; /* Size of entry buffer */ - int status; /* Operation status */ - unsigned char options; - unsigned char quote_char; - unsigned char delim_char; - int (*is_space)(unsigned char); - int (*is_term)(unsigned char); - size_t blk_size; - void *(*malloc_func)(size_t); /* not used */ - void *(*realloc_func)(void *, size_t); /* function used to allocate buffer memory */ - void (*free_func)(void *); /* function used to free buffer memory */ -}; - -/* Function Prototypes */ -int csv_init(struct csv_parser *p, unsigned char options); -int csv_fini(struct csv_parser *p, void (*cb1)(void *, size_t, void *), void (*cb2)(int, void *), void *data); -void csv_free(struct csv_parser *p); -int csv_error(const struct csv_parser *p); -const char * csv_strerror(int error); -size_t csv_parse(struct csv_parser *p, const void *s, size_t len, void (*cb1)(void *, size_t, void *), void (*cb2)(int, void *), void *data); -size_t csv_write(void *dest, size_t dest_size, const void *src, size_t src_size); -int csv_fwrite(FILE *fp, const void *src, size_t src_size); -size_t csv_write2(void *dest, size_t dest_size, const void *src, size_t src_size, unsigned char quote); -int csv_fwrite2(FILE *fp, const void *src, size_t src_size, unsigned char quote); -int csv_get_opts(const struct csv_parser *p); -int csv_set_opts(struct csv_parser *p, unsigned char options); -void csv_set_delim(struct csv_parser *p, unsigned char c); -void csv_set_quote(struct csv_parser *p, unsigned char c); -unsigned char csv_get_delim(const struct csv_parser *p); -unsigned char csv_get_quote(const struct csv_parser *p); -void csv_set_space_func(struct csv_parser *p, int (*f)(unsigned char)); -void csv_set_term_func(struct csv_parser *p, int (*f)(unsigned char)); -void csv_set_realloc_func(struct csv_parser *p, void *(*)(void *, size_t)); -void csv_set_free_func(struct csv_parser *p, void (*)(void *)); -void csv_set_blk_size(struct csv_parser *p, size_t); -size_t csv_get_buffer_size(const struct csv_parser *p); - -#ifdef __cplusplus -} -#endif - -#endif