Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ public Void call() throws Exception {
}

Path dbPath = parent.resolveDbPath();
if (dbPath == null) {
return null;
}

ContainerDatanodeDatabase cdd = new ContainerDatanodeDatabase(dbPath.toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ public Path resolveDbPath() {
if (Files.exists(resolvedPath) && Files.isRegularFile(resolvedPath)) {
out().println("Using default database file found in current directory: " + resolvedPath);
} else {
err().println("No database path provided and default file '" + SQLDBConstants.DEFAULT_DB_FILENAME + "' not " +
throw new IllegalArgumentException("No database path provided and default file '" +
SQLDBConstants.DEFAULT_DB_FILENAME + "' not " +
"found in current directory. Please provide a valid database path");
return null;
}
} else {
resolvedPath = Paths.get(dbPath);
Path parentDir = resolvedPath.getParent();

if (parentDir != null && !Files.exists(parentDir)) {
err().println("The parent directory of the provided database path does not exist: " + parentDir);
return null;
throw new IllegalArgumentException("The parent directory of the provided database " +
"path does not exist: " + parentDir);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ public class DuplicateOpenContainersCommand implements Callable<Void> {
@Override
public Void call() throws Exception {
Path dbPath = parent.resolveDbPath();
if (dbPath == null) {
return null;
}

ContainerDatanodeDatabase cdd = new ContainerDatanodeDatabase(dbPath.toString());
cdd.findDuplicateOpenContainer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.nio.file.Path;
import java.util.concurrent.Callable;
import org.apache.hadoop.hdds.cli.AbstractSubcommand;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.ozone.debug.logs.container.utils.ContainerDatanodeDatabase;
import org.apache.hadoop.ozone.shell.ListLimitOptions;
import picocli.CommandLine;
Expand All @@ -33,30 +35,52 @@
name = "list",
description = "Finds containers from the database based on the option provided."
)
public class ListContainers implements Callable<Void> {
public class ListContainers extends AbstractSubcommand implements Callable<Void> {

@CommandLine.Option(names = {"--state"},
description = "Life cycle state of the container.",
required = true)
private HddsProtos.LifeCycleState state;
@CommandLine.ArgGroup(multiplicity = "1")
private ExclusiveOptions exclusiveOptions;

@CommandLine.Mixin
private ListLimitOptions listOptions;

@CommandLine.ParentCommand
private ContainerLogController parent;

private static final class ExclusiveOptions {
@CommandLine.Option(names = {"--lifecycle"},
description = "Life cycle state of the container.")
private HddsProtos.LifeCycleState lifecycleState;

@CommandLine.Option(names = {"--health"},
description = "Health state of the container.")
private ReplicationManagerReport.HealthState healthState;
}

@Override
public Void call() throws Exception {

Path dbPath = parent.resolveDbPath();
if (dbPath == null) {
return null;
}

ContainerDatanodeDatabase cdd = new ContainerDatanodeDatabase(dbPath.toString());

cdd.listContainersByState(state.name(), listOptions.getLimit());
if (exclusiveOptions.lifecycleState != null) {
cdd.listContainersByState(exclusiveOptions.lifecycleState.name(), listOptions.getLimit());
} else if (exclusiveOptions.healthState != null) {
switch (exclusiveOptions.healthState) {
case UNDER_REPLICATED:
case OVER_REPLICATED:
cdd.listReplicatedContainers(exclusiveOptions.healthState.name(), listOptions.getLimit());
break;
case UNHEALTHY:
cdd.listUnhealthyContainers(listOptions.getLimit());
break;
case QUASI_CLOSED_STUCK:
cdd.listQuasiClosedStuckContainers(listOptions.getLimit());
break;
default:
err().println("Unsupported health state: " + exclusiveOptions.healthState);
}
}

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ public class ContainerDatanodeDatabase {
private static final int DEFAULT_REPLICATION_FACTOR;

private final PrintWriter out;
private final PrintWriter err;

public ContainerDatanodeDatabase(String dbPath) {
this.databasePath = dbPath;
this.out = new PrintWriter(new OutputStreamWriter(System.out, StandardCharsets.UTF_8), true);
this.err = new PrintWriter(new OutputStreamWriter(System.err, StandardCharsets.UTF_8), true);
}

public ContainerDatanodeDatabase(String dbPath, PrintWriter out) {
public ContainerDatanodeDatabase(String dbPath, PrintWriter out, PrintWriter err) {
this.databasePath = dbPath;
this.out = out;
this.err = err;
}

static {
Expand Down Expand Up @@ -122,6 +125,7 @@ public void createIndexes() throws SQLException {
createIdxDclContainerStateTime(stmt);
createContainerLogIndex(stmt);
createIdxContainerlogContainerId(stmt);
createIndexForQuasiClosedQuery(stmt);
} catch (SQLException e) {
throw new SQLException("Error while creating index: " + e.getMessage());
} catch (Exception e) {
Expand All @@ -144,6 +148,11 @@ private void createIdxContainerlogContainerId(Statement stmt) throws SQLExceptio
stmt.execute(createIndexSQL);
}

private void createIndexForQuasiClosedQuery(Statement stmt) throws SQLException {
String createIndexSQL = SQLDBConstants.CREATE_DCL_STATE_CONTAINER_DATANODE_TIME_INDEX;
stmt.execute(createIndexSQL);
}

/**
* Inserts a list of container log entries into the DatanodeContainerLogTable.
*
Expand Down Expand Up @@ -609,5 +618,156 @@ private List<DatanodeContainerInfo> getContainerLogDataForOpenContainers(Long co

return logEntries;
}

/**
* Lists containers that are over- or under-replicated also provides count of replicas.
*/

public void listReplicatedContainers(String overOrUnder, Integer limit) throws SQLException {
String operator;
if ("OVER_REPLICATED".equalsIgnoreCase(overOrUnder)) {
operator = ">";
} else if ("UNDER_REPLICATED".equalsIgnoreCase(overOrUnder)) {
operator = "<";
} else {
err.println("Invalid type. Use OVER_REPLICATED or UNDER_REPLICATED.");
return;
}

String rawQuery = SQLDBConstants.SELECT_REPLICATED_CONTAINERS;

if (!rawQuery.contains("{operator}")) {
err.println("Query not defined correctly.");
return;
}

String finalQuery = rawQuery.replace("{operator}", operator);

boolean limitProvided = limit != Integer.MAX_VALUE;
if (limitProvided) {
finalQuery += " LIMIT ?";
}

try (Connection connection = getConnection();
PreparedStatement pstmt = connection.prepareStatement(finalQuery)) {

pstmt.setInt(1, DEFAULT_REPLICATION_FACTOR);

if (limitProvided) {
pstmt.setInt(2, limit + 1);
}

try (ResultSet rs = pstmt.executeQuery()) {
int count = 0;

while (rs.next()) {
if (limitProvided && count >= limit) {
err.println("Note: There might be more containers. Use --all option to list all entries.");
break;
}

out.printf("Container ID = %s - Count = %d%n", rs.getLong("container_id"),
rs.getInt("replica_count"));
count++;
}

out.println("Number of containers listed: " + count);

}

} catch (SQLException e) {
throw new SQLException("Error while retrieving containers." + e.getMessage(), e);
} catch (Exception e) {
throw new RuntimeException("Unexpected error: " + e);
}
}

/**
* Lists containers that are UNHEALTHY also provides count of replicas which are in UNHEALTHY state.
*/

public void listUnhealthyContainers(Integer limit) throws SQLException {

String query = SQLDBConstants.SELECT_UNHEALTHY_CONTAINERS;

boolean limitProvided = limit != Integer.MAX_VALUE;
if (limitProvided) {
query += " LIMIT ?";
}

try (Connection connection = getConnection();
PreparedStatement stmt = connection.prepareStatement(query)) {

if (limitProvided) {
stmt.setInt(1, limit + 1);
}

try (ResultSet rs = stmt.executeQuery()) {
int count = 0;

while (rs.next()) {
if (limitProvided && count >= limit) {
err.println("Note: There might be more containers. Use --all option to list all entries.");
break;
}

out.printf("Container ID = %s - Count = %d%n", rs.getString("container_id"),
rs.getInt("unhealthy_replica_count"));
count++;
}

out.println("Number of containers listed: " + count);
}

} catch (SQLException e) {
throw new SQLException("Error while retrieving containers." + e.getMessage(), e);
} catch (Exception e) {
throw new RuntimeException("Unexpected error: " + e);
}
}

/**
* Lists containers that are QUASI_CLOSED stuck also provides count of replicas which are in QUASI_CLOSED state.
*/

public void listQuasiClosedStuckContainers(Integer limit) throws SQLException {

String query = SQLDBConstants.SELECT_QUASI_CLOSED_STUCK_CONTAINERS;

boolean limitProvided = limit != Integer.MAX_VALUE;
if (limitProvided) {
query += " LIMIT ?";
}

try (Connection connection = getConnection();
PreparedStatement statement = connection.prepareStatement(query)) {

if (limitProvided) {
statement.setInt(1, limit + 1);
}

try (ResultSet resultSet = statement.executeQuery()) {
int count = 0;

while (resultSet.next()) {
if (limitProvided && count >= limit) {
err.println("Note: There might be more containers. Use --all option to list all entries.");
break;
}

out.printf("Container ID = %s - Count = %d%n", resultSet.getString("container_id"),
resultSet.getInt("quasi_closed_replica_count"));
count++;
}

out.println("Number of containers listed: " + count);
}

} catch (SQLException e) {
throw new SQLException("Error while retrieving containers." + e.getMessage(), e);
} catch (Exception e) {
throw new RuntimeException("Unexpected error: " + e);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.hadoop.ozone.debug.logs.container.utils;

import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;

/**
* Constants used for ContainerDatanodeDatabase.
*/
Expand All @@ -29,6 +32,11 @@ public final class SQLDBConstants {
public static final int BATCH_SIZE = 2500;
public static final String DATANODE_CONTAINER_LOG_TABLE_NAME = "DatanodeContainerLogTable";
public static final String CONTAINER_LOG_TABLE_NAME = "ContainerLogTable";
public static final String CLOSED_STATE = HddsProtos.LifeCycleState.CLOSED.name();
public static final String DELETED_STATE = HddsProtos.LifeCycleState.DELETED.name();
public static final String UNHEALTHY_STATE = ReplicationManagerReport.HealthState.UNHEALTHY.name();
public static final String QUASI_CLOSED_STATE = HddsProtos.LifeCycleState.QUASI_CLOSED.name();

public static final String CREATE_DATANODE_CONTAINER_LOG_TABLE =
"CREATE TABLE IF NOT EXISTS DatanodeContainerLogTable (datanode_id TEXT NOT NULL, " +
"container_id INTEGER NOT NULL, timestamp TEXT NOT NULL, container_state TEXT, bcsid INTEGER, " +
Expand Down Expand Up @@ -75,6 +83,62 @@ public final class SQLDBConstants {
public static final String SELECT_CONTAINER_DETAILS_OPEN_STATE = "SELECT d.timestamp, d.container_id, " +
"d.datanode_id, d.container_state FROM DatanodeContainerLogTable d " +
"WHERE d.container_id = ? AND d.container_state = 'OPEN' ORDER BY d.timestamp ASC;";
public static final String CREATE_DCL_STATE_CONTAINER_DATANODE_TIME_INDEX =
"CREATE INDEX IF NOT EXISTS idx_dcl_state_container_datanode_time " +
"ON DatanodeContainerLogTable(container_state, container_id, datanode_id, timestamp DESC);";
public static final String SELECT_REPLICATED_CONTAINERS =
"SELECT container_id, COUNT(DISTINCT datanode_id) AS replica_count\n" +
"FROM ContainerLogTable\n" +
"WHERE latest_state != '" + DELETED_STATE + "'\n" +
" GROUP BY container_id\n" +
"HAVING COUNT(DISTINCT datanode_id) {operator} ?";
public static final String SELECT_UNHEALTHY_CONTAINERS =
"SELECT u.container_id, COUNT(*) AS unhealthy_replica_count\n" +
"FROM (\n" +
" SELECT container_id, datanode_id, MAX(timestamp) AS latest_unhealthy_timestamp\n" +
" FROM DatanodeContainerLogTable\n" +
" WHERE container_state = '" + UNHEALTHY_STATE + "'\n" +
" GROUP BY container_id, datanode_id\n" +
") AS u\n" +
"LEFT JOIN (\n" +
" SELECT container_id, datanode_id, MAX(timestamp) AS latest_closed_timestamp\n" +
" FROM DatanodeContainerLogTable\n" +
" WHERE container_state IN ('" + CLOSED_STATE + "', '" + DELETED_STATE + "')\n" +
" GROUP BY container_id, datanode_id\n" +
") AS c\n" +
"ON u.container_id = c.container_id AND u.datanode_id = c.datanode_id\n" +
"WHERE c.latest_closed_timestamp IS NULL \n" +
" OR u.latest_unhealthy_timestamp > c.latest_closed_timestamp\n" +
"GROUP BY u.container_id\n" +
"ORDER BY u.container_id";
public static final String SELECT_QUASI_CLOSED_STUCK_CONTAINERS =
"WITH quasi_closed_replicas AS ( " +
" SELECT container_id, datanode_id, MAX(timestamp) AS latest_quasi_closed_timestamp\n" +
" FROM DatanodeContainerLogTable " +
" WHERE container_state = '" + QUASI_CLOSED_STATE + "'\n" +
" GROUP BY container_id, datanode_id" +
"), " +
"container_with_enough_quasi_closed AS (\n" +
" SELECT container_id\n" +
" FROM quasi_closed_replicas\n" +
" GROUP BY container_id\n" +
" HAVING COUNT(DISTINCT datanode_id) >= 3\n" +
"),\n" +
"closed_or_deleted AS (\n" +
" SELECT container_id, datanode_id, MAX(timestamp) AS latest_closed_timestamp\n" +
" FROM DatanodeContainerLogTable\n" +
" WHERE container_state IN ('" + CLOSED_STATE + "', '" + DELETED_STATE + "')\n" +
" GROUP BY container_id, datanode_id\n" +
")\n" +
"SELECT q.container_id, COUNT(*) AS quasi_closed_replica_count\n" +
"FROM quasi_closed_replicas q\n" +
"JOIN container_with_enough_quasi_closed qc ON q.container_id = qc.container_id\n" +
"LEFT JOIN closed_or_deleted c \n" +
" ON q.container_id = c.container_id AND q.datanode_id = c.datanode_id\n" +
"WHERE c.latest_closed_timestamp IS NULL\n" +
" OR q.latest_quasi_closed_timestamp > c.latest_closed_timestamp\n" +
"GROUP BY q.container_id\n" +
"ORDER BY q.container_id";

private SQLDBConstants() {
//Never constructed
Expand Down