From e183f58000c51b38e1ee7dc20f1705b5e13f9129 Mon Sep 17 00:00:00 2001 From: Bhargav Date: Tue, 28 May 2024 12:22:59 +0530 Subject: [PATCH 1/3] Infer size dynamically --- apps/csv2sql/lib/csv2sql/database/mysql.ex | 14 +++++++++++--- .../lib/csv2sql/type_deducer/type_deducer.ex | 3 ++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/apps/csv2sql/lib/csv2sql/database/mysql.ex b/apps/csv2sql/lib/csv2sql/database/mysql.ex index 9415b7f..3ff194d 100644 --- a/apps/csv2sql/lib/csv2sql/database/mysql.ex +++ b/apps/csv2sql/lib/csv2sql/database/mysql.ex @@ -17,12 +17,20 @@ defmodule Csv2sql.Database.MySql do type_map[:is_boolean] -> "BIT" type_map[:is_integer] -> "INT" type_map[:is_float] -> "DOUBLE" - type_map[:is_text] and type_map[:max_data_length] > 65_535 -> "LONGTEXT" - type_map[:is_text] -> "TEXT" - true -> "VARCHAR(#{varchar_limit()})" + true -> type_map[:max_data_length] |> string_column_type() |> get_string_column_type() end end + def string_column_type(max_data_length) do + if max_data_length > varchar_limit(), + do: :text, + else: {:varchar, max_data_length} + end + + defp get_string_column_type(:text), do: "LONGTEXT" + defp get_string_column_type({:varchar, 0}), do: "VARCHAR(#{varchar_limit()})" + defp get_string_column_type({:varchar, size}), do: "VARCHAR(#{size})" + @impl Csv2sql.Database @spec db_name :: String.t() def db_name do diff --git a/apps/csv2sql/lib/csv2sql/type_deducer/type_deducer.ex b/apps/csv2sql/lib/csv2sql/type_deducer/type_deducer.ex index 193ba25..974649e 100644 --- a/apps/csv2sql/lib/csv2sql/type_deducer/type_deducer.ex +++ b/apps/csv2sql/lib/csv2sql/type_deducer/type_deducer.ex @@ -111,7 +111,8 @@ defmodule Csv2sql.TypeDeducer do is_boolean: acc_map.is_boolean && current_map.is_boolean, is_integer: acc_map.is_integer && current_map.is_integer, is_float: acc_map.is_float && current_map.is_float, - is_text: acc_map.is_text || current_map.is_text + is_text: acc_map.is_text || current_map.is_text, + max_data_length: max(acc_map.max_data_length, current_map.max_data_length) } end end From be2f6d9b6f55194f5623a3627a1da514a4db35ed Mon Sep 17 00:00:00 2001 From: Bhargav Date: Tue, 28 May 2024 16:25:39 +0530 Subject: [PATCH 2/3] add producer_pid column to file struct --- apps/csv2sql/lib/csv2sql/file.ex | 1 + .../progress_tracker/progress_tracker.ex | 29 +++++++++++-------- apps/csv2sql/lib/csv2sql/stages/analyze.ex | 4 +++ 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/apps/csv2sql/lib/csv2sql/file.ex b/apps/csv2sql/lib/csv2sql/file.ex index f506ff9..d4b74bb 100644 --- a/apps/csv2sql/lib/csv2sql/file.ex +++ b/apps/csv2sql/lib/csv2sql/file.ex @@ -17,5 +17,6 @@ defmodule Csv2sql.File do field(:column_types, csv_col_types_list(), enforce: false) field(:status, file_status(), default: :pending) field(:existing_db_row_count, non_neg_integer(), default: 0) + field(:producer_pid, pid(), default: nil) end end diff --git a/apps/csv2sql/lib/csv2sql/progress_tracker/progress_tracker.ex b/apps/csv2sql/lib/csv2sql/progress_tracker/progress_tracker.ex index 8127fb3..0c4a73e 100644 --- a/apps/csv2sql/lib/csv2sql/progress_tracker/progress_tracker.ex +++ b/apps/csv2sql/lib/csv2sql/progress_tracker/progress_tracker.ex @@ -99,18 +99,23 @@ defmodule Csv2sql.ProgressTracker do end @impl true - def handle_cast(:reset_state, state), - do: - {:noreply, - %{ - state - | files: %{}, - subscribers: [], - status: :init, - start_time: nil, - end_time: nil, - validation_status: nil - }} + def handle_cast(:reset_state, %State{files: files} = state) do + files + |> Enum.map(fn {_path, %Csv2sql.File{producer_pid: producer_pid}} -> + Process.exit(producer_pid, :kill) + end) + + {:noreply, + %{ + state + | files: %{}, + subscribers: [], + status: :init, + start_time: nil, + end_time: nil, + validation_status: nil + }} + end @impl true def handle_cast({:update_file, _file}, %State{status: :error} = state), do: {:noreply, state} diff --git a/apps/csv2sql/lib/csv2sql/stages/analyze.ex b/apps/csv2sql/lib/csv2sql/stages/analyze.ex index 2bdfff3..b15edc8 100644 --- a/apps/csv2sql/lib/csv2sql/stages/analyze.ex +++ b/apps/csv2sql/lib/csv2sql/stages/analyze.ex @@ -124,6 +124,10 @@ defmodule Csv2sql.Stages.Analyze do # Start a producer for the file {:ok, pid} = DbLoader.Producer.start_link(file) + file = %Csv2sql.File{file | producer_pid: pid} + + ProgressTracker.update_file(file) + # Subscribe consumers to the producer GenStage.sync_subscribe( DbLoader.ConsumerSupervisor, From f9535950ffeb26bb32d58f4bd87f79a53c4103af Mon Sep 17 00:00:00 2001 From: Bhargav Date: Tue, 28 May 2024 16:34:13 +0530 Subject: [PATCH 3/3] refactor code --- apps/csv2sql/lib/csv2sql/database/database.ex | 18 +++++++++++++++++- apps/csv2sql/lib/csv2sql/database/mysql.ex | 11 +++-------- apps/csv2sql/lib/csv2sql/database/postgres.ex | 7 +++++-- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/apps/csv2sql/lib/csv2sql/database/database.ex b/apps/csv2sql/lib/csv2sql/database/database.ex index 165913b..15698e3 100644 --- a/apps/csv2sql/lib/csv2sql/database/database.ex +++ b/apps/csv2sql/lib/csv2sql/database/database.ex @@ -105,7 +105,10 @@ defmodule Csv2sql.Database do end @spec insert_data_chunk(Csv2sql.File.t(), list) :: :ok - def insert_data_chunk(%Csv2sql.File{name: name, path: path, column_types: column_types}, data_chunk) do + def insert_data_chunk( + %Csv2sql.File{name: name, path: path, column_types: column_types}, + data_chunk + ) do encoded_data_chunk = encode_data_chunk(column_types, data_chunk) repo = Helpers.get_config(:db_type) |> get_repo() @@ -212,4 +215,17 @@ defmodule Csv2sql.Database do -1 end end + + def string_column_type(max_data_length) do + cond do + max_data_length > 65_535 -> + :long_text + + max_data_length > varchar_limit() -> + :text + + true -> + {:varchar, max_data_length} + end + end end diff --git a/apps/csv2sql/lib/csv2sql/database/mysql.ex b/apps/csv2sql/lib/csv2sql/database/mysql.ex index 3ff194d..8ffdb89 100644 --- a/apps/csv2sql/lib/csv2sql/database/mysql.ex +++ b/apps/csv2sql/lib/csv2sql/database/mysql.ex @@ -21,14 +21,9 @@ defmodule Csv2sql.Database.MySql do end end - def string_column_type(max_data_length) do - if max_data_length > varchar_limit(), - do: :text, - else: {:varchar, max_data_length} - end - - defp get_string_column_type(:text), do: "LONGTEXT" - defp get_string_column_type({:varchar, 0}), do: "VARCHAR(#{varchar_limit()})" + defp get_string_column_type(:long_text), do: "LONGTEXT" + defp get_string_column_type(:text), do: "TEXT" + defp get_string_column_type({:varchar, 0}), do: "VARCHAR(1)" defp get_string_column_type({:varchar, size}), do: "VARCHAR(#{size})" @impl Csv2sql.Database diff --git a/apps/csv2sql/lib/csv2sql/database/postgres.ex b/apps/csv2sql/lib/csv2sql/database/postgres.ex index 8802336..7913cb3 100644 --- a/apps/csv2sql/lib/csv2sql/database/postgres.ex +++ b/apps/csv2sql/lib/csv2sql/database/postgres.ex @@ -17,11 +17,14 @@ defmodule Csv2sql.Database.Postgres do type_map[:is_boolean] -> "BOOLEAN" type_map[:is_integer] -> "INT" type_map[:is_float] -> "NUMERIC(1000, 100)" - type_map[:is_text] -> "TEXT" - true -> "VARCHAR(#{varchar_limit()})" + true -> type_map[:max_data_length] |> string_column_type() |> get_string_column_type() end end + defp get_string_column_type(size) when size in [:text, :long_text], do: "TEXT" + defp get_string_column_type({:varchar, 0}), do: "VARCHAR(1)" + defp get_string_column_type({:varchar, size}), do: "VARCHAR(#{size})" + @impl Csv2sql.Database @spec db_name :: String.t() def db_name do