diff --git a/apps/csv2sql/lib/csv2sql/database/database.ex b/apps/csv2sql/lib/csv2sql/database/database.ex index 165913b..15698e3 100644 --- a/apps/csv2sql/lib/csv2sql/database/database.ex +++ b/apps/csv2sql/lib/csv2sql/database/database.ex @@ -105,7 +105,10 @@ defmodule Csv2sql.Database do end @spec insert_data_chunk(Csv2sql.File.t(), list) :: :ok - def insert_data_chunk(%Csv2sql.File{name: name, path: path, column_types: column_types}, data_chunk) do + def insert_data_chunk( + %Csv2sql.File{name: name, path: path, column_types: column_types}, + data_chunk + ) do encoded_data_chunk = encode_data_chunk(column_types, data_chunk) repo = Helpers.get_config(:db_type) |> get_repo() @@ -212,4 +215,17 @@ defmodule Csv2sql.Database do -1 end end + + def string_column_type(max_data_length) do + cond do + max_data_length > 65_535 -> + :long_text + + max_data_length > varchar_limit() -> + :text + + true -> + {:varchar, max_data_length} + end + end end diff --git a/apps/csv2sql/lib/csv2sql/database/mysql.ex b/apps/csv2sql/lib/csv2sql/database/mysql.ex index 9415b7f..8ffdb89 100644 --- a/apps/csv2sql/lib/csv2sql/database/mysql.ex +++ b/apps/csv2sql/lib/csv2sql/database/mysql.ex @@ -17,12 +17,15 @@ defmodule Csv2sql.Database.MySql do type_map[:is_boolean] -> "BIT" type_map[:is_integer] -> "INT" type_map[:is_float] -> "DOUBLE" - type_map[:is_text] and type_map[:max_data_length] > 65_535 -> "LONGTEXT" - type_map[:is_text] -> "TEXT" - true -> "VARCHAR(#{varchar_limit()})" + true -> type_map[:max_data_length] |> string_column_type() |> get_string_column_type() end end + defp get_string_column_type(:long_text), do: "LONGTEXT" + defp get_string_column_type(:text), do: "TEXT" + defp get_string_column_type({:varchar, 0}), do: "VARCHAR(1)" + defp get_string_column_type({:varchar, size}), do: "VARCHAR(#{size})" + @impl Csv2sql.Database @spec db_name :: String.t() def db_name do diff --git a/apps/csv2sql/lib/csv2sql/database/postgres.ex b/apps/csv2sql/lib/csv2sql/database/postgres.ex index 8802336..7913cb3 100644 --- a/apps/csv2sql/lib/csv2sql/database/postgres.ex +++ b/apps/csv2sql/lib/csv2sql/database/postgres.ex @@ -17,11 +17,14 @@ defmodule Csv2sql.Database.Postgres do type_map[:is_boolean] -> "BOOLEAN" type_map[:is_integer] -> "INT" type_map[:is_float] -> "NUMERIC(1000, 100)" - type_map[:is_text] -> "TEXT" - true -> "VARCHAR(#{varchar_limit()})" + true -> type_map[:max_data_length] |> string_column_type() |> get_string_column_type() end end + defp get_string_column_type(size) when size in [:text, :long_text], do: "TEXT" + defp get_string_column_type({:varchar, 0}), do: "VARCHAR(1)" + defp get_string_column_type({:varchar, size}), do: "VARCHAR(#{size})" + @impl Csv2sql.Database @spec db_name :: String.t() def db_name do diff --git a/apps/csv2sql/lib/csv2sql/file.ex b/apps/csv2sql/lib/csv2sql/file.ex index f506ff9..d4b74bb 100644 --- a/apps/csv2sql/lib/csv2sql/file.ex +++ b/apps/csv2sql/lib/csv2sql/file.ex @@ -17,5 +17,6 @@ defmodule Csv2sql.File do field(:column_types, csv_col_types_list(), enforce: false) field(:status, file_status(), default: :pending) field(:existing_db_row_count, non_neg_integer(), default: 0) + field(:producer_pid, pid(), default: nil) end end diff --git a/apps/csv2sql/lib/csv2sql/progress_tracker/progress_tracker.ex b/apps/csv2sql/lib/csv2sql/progress_tracker/progress_tracker.ex index 8127fb3..0c4a73e 100644 --- a/apps/csv2sql/lib/csv2sql/progress_tracker/progress_tracker.ex +++ b/apps/csv2sql/lib/csv2sql/progress_tracker/progress_tracker.ex @@ -99,18 +99,23 @@ defmodule Csv2sql.ProgressTracker do end @impl true - def handle_cast(:reset_state, state), - do: - {:noreply, - %{ - state - | files: %{}, - subscribers: [], - status: :init, - start_time: nil, - end_time: nil, - validation_status: nil - }} + def handle_cast(:reset_state, %State{files: files} = state) do + files + |> Enum.map(fn {_path, %Csv2sql.File{producer_pid: producer_pid}} -> + Process.exit(producer_pid, :kill) + end) + + {:noreply, + %{ + state + | files: %{}, + subscribers: [], + status: :init, + start_time: nil, + end_time: nil, + validation_status: nil + }} + end @impl true def handle_cast({:update_file, _file}, %State{status: :error} = state), do: {:noreply, state} diff --git a/apps/csv2sql/lib/csv2sql/stages/analyze.ex b/apps/csv2sql/lib/csv2sql/stages/analyze.ex index 2bdfff3..b15edc8 100644 --- a/apps/csv2sql/lib/csv2sql/stages/analyze.ex +++ b/apps/csv2sql/lib/csv2sql/stages/analyze.ex @@ -124,6 +124,10 @@ defmodule Csv2sql.Stages.Analyze do # Start a producer for the file {:ok, pid} = DbLoader.Producer.start_link(file) + file = %Csv2sql.File{file | producer_pid: pid} + + ProgressTracker.update_file(file) + # Subscribe consumers to the producer GenStage.sync_subscribe( DbLoader.ConsumerSupervisor, diff --git a/apps/csv2sql/lib/csv2sql/type_deducer/type_deducer.ex b/apps/csv2sql/lib/csv2sql/type_deducer/type_deducer.ex index 193ba25..974649e 100644 --- a/apps/csv2sql/lib/csv2sql/type_deducer/type_deducer.ex +++ b/apps/csv2sql/lib/csv2sql/type_deducer/type_deducer.ex @@ -111,7 +111,8 @@ defmodule Csv2sql.TypeDeducer do is_boolean: acc_map.is_boolean && current_map.is_boolean, is_integer: acc_map.is_integer && current_map.is_integer, is_float: acc_map.is_float && current_map.is_float, - is_text: acc_map.is_text || current_map.is_text + is_text: acc_map.is_text || current_map.is_text, + max_data_length: max(acc_map.max_data_length, current_map.max_data_length) } end end