diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java index 3ee679bb80ed..84222fc6ccfb 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.recon.persistence; +import static org.apache.ozone.recon.schema.ContainerSchemaDefinition.UNHEALTHY_CONTAINERS_TABLE_NAME; import static org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.ALL_REPLICAS_BAD; import static org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates.UNDER_REPLICATED; import static org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable.UNHEALTHY_CONTAINERS; @@ -24,6 +25,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; +import java.sql.Connection; import java.util.List; import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary; import org.apache.ozone.recon.schema.ContainerSchemaDefinition; @@ -35,6 +37,7 @@ import org.jooq.DSLContext; import org.jooq.Record; import org.jooq.SelectQuery; +import org.jooq.exception.DataAccessException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,12 +121,34 @@ public Cursor getAllUnhealthyRecordsCursor() { public void insertUnhealthyContainerRecords(List recs) { if (LOG.isDebugEnabled()) { - recs.forEach(rec -> { - LOG.debug("rec.getContainerId() : {}, rec.getContainerState(): {} ", rec.getContainerId(), - rec.getContainerState()); - }); + recs.forEach(rec -> LOG.debug("rec.getContainerId() : {}, rec.getContainerState(): {}", + rec.getContainerId(), rec.getContainerState())); + } + + try (Connection connection = containerSchemaDefinition.getDataSource().getConnection()) { + connection.setAutoCommit(false); // Turn off auto-commit for transactional control + try { + for (UnhealthyContainers rec : recs) { + try { + unhealthyContainersDao.insert(rec); + } catch (DataAccessException dataAccessException) { + // Log the error and update the existing record if ConstraintViolationException occurs + unhealthyContainersDao.update(rec); + LOG.debug("Error while inserting unhealthy container record: {}", rec, dataAccessException); + } + } + connection.commit(); // Commit all inserted/updated records + } catch (Exception innerException) { + connection.rollback(); // Rollback transaction if an error occurs inside processing + LOG.error("Transaction rolled back due to error", innerException); + throw innerException; + } finally { + connection.setAutoCommit(true); // Reset auto-commit before the connection is auto-closed + } + } catch (Exception e) { + LOG.error("Failed to insert records into {} ", UNHEALTHY_CONTAINERS_TABLE_NAME, e); + throw new RuntimeException("Recon failed to insert " + recs.size() + " unhealthy container records.", e); } - unhealthyContainersDao.insert(recs); } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java index a5e891f1cdca..bdd5ca35b296 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java @@ -25,9 +25,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -494,6 +494,69 @@ public void testAllContainerStateInsertions() { } } + @Test + public void testInsertFailureAndUpdateBehavior() { + UnhealthyContainersDao unHealthyContainersTableHandle = + getDao(UnhealthyContainersDao.class); + + ContainerHealthSchemaManager containerHealthSchemaManager = + new ContainerHealthSchemaManager( + getSchemaDefinition(ContainerSchemaDefinition.class), + unHealthyContainersTableHandle); + + ContainerSchemaDefinition.UnHealthyContainerStates state = + ContainerSchemaDefinition.UnHealthyContainerStates.MISSING; + + long insertedTime = System.currentTimeMillis(); + // Create a dummy UnhealthyContainer record with the current state + UnhealthyContainers unhealthyContainer = new UnhealthyContainers(); + unhealthyContainer.setContainerId(state.ordinal() + 1L); + unhealthyContainer.setExpectedReplicaCount(3); + unhealthyContainer.setActualReplicaCount(0); + unhealthyContainer.setReplicaDelta(3); + unhealthyContainer.setContainerState(state.name()); + unhealthyContainer.setInStateSince(insertedTime); + + // Try inserting the record and catch any exception that occurs + Exception exception = null; + try { + containerHealthSchemaManager.insertUnhealthyContainerRecords( + Collections.singletonList(unhealthyContainer)); + } catch (Exception e) { + exception = e; + } + + // Assert no exception should be thrown for each state + assertNull(exception, + "Exception was thrown during insertion for state " + state.name() + + ": " + exception); + + long updatedTime = System.currentTimeMillis(); + unhealthyContainer.setExpectedReplicaCount(3); + unhealthyContainer.setActualReplicaCount(0); + unhealthyContainer.setReplicaDelta(3); + unhealthyContainer.setContainerState(state.name()); + unhealthyContainer.setInStateSince(updatedTime); + + try { + containerHealthSchemaManager.insertUnhealthyContainerRecords( + Collections.singletonList(unhealthyContainer)); + } catch (Exception e) { + exception = e; + } + + // Optionally, verify the record was updated correctly + List updatedRecords = + unHealthyContainersTableHandle.fetchByContainerId( + state.ordinal() + 1L); + assertFalse(updatedRecords.isEmpty(), + "Record was not updated for state " + state.name() + "."); + assertEquals(updatedRecords.get(0).getContainerState(), state.name(), + "The inserted container state does not match for state " + + state.name() + "."); + assertEquals(updatedRecords.get(0).getInStateSince(), updatedTime); + } + @Test public void testMissingAndEmptyMissingContainerDeletion() throws Exception { // Setup mock DAOs and managers