Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,64 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.AllArgsConstructor;
import org.hypertrace.core.documentstore.expression.impl.DataType;
import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata;
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType;
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;

/**
* Fetches schema metadata directly from Postgres system catalogs. Hardcoded to query
* information_schema.columns.
* Fetches schema metadata directly from Postgres system catalogs (pg_catalog). Todo: Composite PKs
* are not supported yet
*/
@AllArgsConstructor
public class PostgresMetadataFetcher {

private final PostgresClient client;

private static final String DISCOVERY_SQL =
"SELECT column_name, udt_name, is_nullable "
+ "FROM information_schema.columns "
+ "WHERE table_schema = 'public' AND table_name = ?";

private static final String PRIMARY_KEY_SQL =
"SELECT a.attname as column_name "
+ "FROM pg_index i "
+ "JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) "
+ "WHERE i.indrelid = ?::regclass AND i.indisprimary";
"SELECT a.attname AS column_name, "
+ "t.typname AS udt_name, "
+ "NOT a.attnotnull AS is_nullable, "
+ "COALESCE(pk.is_pk, false) AS is_primary_key "
+ "FROM pg_attribute a "
+ "JOIN pg_class c ON a.attrelid = c.oid "
+ "JOIN pg_namespace n ON c.relnamespace = n.oid "
+ "JOIN pg_type t ON a.atttypid = t.oid "
+ "LEFT JOIN ( "
+ " SELECT a2.attname, true AS is_pk "
+ " FROM pg_index i "
+ " JOIN pg_attribute a2 ON a2.attrelid = i.indrelid AND a2.attnum = ANY(i.indkey) "
+ " WHERE i.indisprimary "
+ ") pk ON pk.attname = a.attname "
+ "WHERE c.relname = ? "
+ "AND n.nspname = 'public' "
+ "AND a.attnum > 0 "
+ "AND NOT a.attisdropped";

public Map<String, PostgresColumnMetadata> fetch(String tableName) {
Map<String, PostgresColumnMetadata> metadataMap = new HashMap<>();

try (Connection conn = client.getPooledConnection()) {
Set<String> primaryKeyColumns = fetchPrimaryKeyColumns(conn, tableName);

try (PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) {
ps.setString(1, tableName);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String columnName = rs.getString("column_name");
String udtName = rs.getString("udt_name");
boolean isNullable = "YES".equalsIgnoreCase(rs.getString("is_nullable"));
boolean isArray = udtName != null && udtName.startsWith("_");
String baseType = isArray ? udtName.substring(1) : udtName;
boolean isPrimaryKey = primaryKeyColumns.contains(columnName);
metadataMap.put(
columnName,
new PostgresColumnMetadata(
columnName,
mapToCanonicalType(baseType),
mapToPostgresType(baseType),
isNullable,
isArray,
isPrimaryKey));
}
try (Connection conn = client.getPooledConnection();
PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) {
ps.setString(1, tableName);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String columnName = rs.getString("column_name");
String udtName = rs.getString("udt_name");
boolean isNullable = rs.getBoolean("is_nullable");
boolean isArray = udtName != null && udtName.startsWith("_");
String baseType = isArray ? udtName.substring(1) : udtName;
boolean isPrimaryKey = rs.getBoolean("is_primary_key");
metadataMap.put(
columnName,
new PostgresColumnMetadata(
columnName,
mapToCanonicalType(baseType),
mapToPostgresType(baseType),
isNullable,
isArray,
isPrimaryKey));
}
}
return metadataMap;
Expand All @@ -68,20 +71,6 @@ public Map<String, PostgresColumnMetadata> fetch(String tableName) {
}
}

private Set<String> fetchPrimaryKeyColumns(Connection conn, String tableName)
throws SQLException {
Set<String> pkColumns = new HashSet<>();
try (PreparedStatement ps = conn.prepareStatement(PRIMARY_KEY_SQL)) {
ps.setString(1, PostgresUtils.wrapFieldNamesWithDoubleQuotes(tableName));
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
pkColumns.add(rs.getString("column_name"));
}
}
}
return pkColumns;
}

/** Maps Postgres udt_name to canonical DataType. */
private DataType mapToCanonicalType(String udtName) {
if (udtName == null) {
Expand Down
Loading
Loading