diff --git a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java index 43e2d728b763..4d62ca886cda 100644 --- a/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java +++ b/hadoop-ozone/recon-codegen/src/main/java/org/hadoop/ozone/recon/schema/ContainerSchemaDefinition.java @@ -51,7 +51,8 @@ public enum UnHealthyContainerStates { UNDER_REPLICATED, OVER_REPLICATED, MIS_REPLICATED, - ALL_REPLICAS_UNHEALTHY + ALL_REPLICAS_UNHEALTHY, + NEGATIVE_SIZE // Added new state to track containers with negative sizes } private static final String CONTAINER_ID = "container_id"; diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java index eaf08d9ca83e..ba03ec61f145 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainersResponse.java @@ -50,6 +50,12 @@ public class UnhealthyContainersResponse { @JsonProperty("misReplicatedCount") private long misReplicatedCount = 0; + /** + * Total count of containers with negative size. + */ + @JsonProperty("negativeSizeCount") + private long negativeSizeCount = 0; + /** * A collection of unhealthy containers. */ @@ -77,6 +83,9 @@ public void setSummaryCount(String state, long count) { } else if (state.equals( UnHealthyContainerStates.MIS_REPLICATED.toString())) { this.misReplicatedCount = count; + } else if (state.equals( + UnHealthyContainerStates.NEGATIVE_SIZE.toString())) { + this.negativeSizeCount = count; } } @@ -96,6 +105,10 @@ public long getMisReplicatedCount() { return misReplicatedCount; } + public long getNegativeSizeCount() { + return negativeSizeCount; + } + public Collection getContainers() { return containers; } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java index 577fb7d2bcc1..a5d259d3e939 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java @@ -217,6 +217,8 @@ private void initializeUnhealthyContainerStateStatsMap( UnHealthyContainerStates.OVER_REPLICATED, new HashMap<>()); unhealthyContainerStateStatsMap.put( UnHealthyContainerStates.MIS_REPLICATED, new HashMap<>()); + unhealthyContainerStateStatsMap.put( + UnHealthyContainerStates.NEGATIVE_SIZE, new HashMap<>()); } private ContainerHealthStatus setCurrentContainer(long recordId) @@ -313,13 +315,21 @@ private long processExistingDBRecords(long currentTime, private void processContainer(ContainerInfo container, long currentTime, Map> - unhealthyContainerStateStatsMap) { + unhealthyContainerStateStatsMap) { try { Set containerReplicas = containerManager.getContainerReplicas(container.containerID()); ContainerHealthStatus h = new ContainerHealthStatus(container, containerReplicas, placementPolicy, reconContainerMetadataManager, conf); + + // Handle negative sized containers separately + if (h.getContainer().getUsedBytes() < 0) { + handleNegativeSizedContainers(h, currentTime, + unhealthyContainerStateStatsMap); + return; + } + if (h.isHealthilyReplicated() || h.isDeleted()) { return; } @@ -365,6 +375,32 @@ private boolean containerDeletedInSCM(ContainerInfo containerInfo) { return false; } + /** + * This method is used to handle containers with negative sizes. It logs an + * error message and inserts a record into the UNHEALTHY_CONTAINERS table. + * @param containerHealthStatus + * @param currentTime + * @param unhealthyContainerStateStatsMap + */ + private void handleNegativeSizedContainers( + ContainerHealthStatus containerHealthStatus, long currentTime, + Map> + unhealthyContainerStateStatsMap) { + ContainerInfo container = containerHealthStatus.getContainer(); + LOG.error( + "Container {} has negative size. Please visit Recon's unhealthy " + + "container endpoint for more details.", + container.getContainerID()); + UnhealthyContainers record = + ContainerHealthRecords.recordForState(containerHealthStatus, + UnHealthyContainerStates.NEGATIVE_SIZE, currentTime); + List records = Collections.singletonList(record); + populateContainerStats(containerHealthStatus, + UnHealthyContainerStates.NEGATIVE_SIZE, + unhealthyContainerStateStatsMap); + containerHealthSchemaManager.insertUnhealthyContainerRecords(records); + } + /** * Helper methods to generate and update the required database records for * unhealthy containers. diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java index fb387861f0e3..105406f2bdf6 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerSizeCountTask.java @@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.recon.ReconUtils; import org.apache.hadoop.ozone.recon.scm.ReconScmTask; import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; +import org.hadoop.ozone.recon.schema.ContainerSchemaDefinition; import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition; import org.hadoop.ozone.recon.schema.tables.daos.ContainerCountBySizeDao; import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; @@ -34,13 +35,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.ArrayList; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED; import static org.hadoop.ozone.recon.schema.tables.ContainerCountBySizeTable.CONTAINER_COUNT_BY_SIZE; @@ -60,6 +62,8 @@ public class ContainerSizeCountTask extends ReconScmTask { private ContainerCountBySizeDao containerCountBySizeDao; private DSLContext dslContext; private HashMap processedContainers = new HashMap<>(); + private Map> + unhealthyContainerStateStatsMap; private ReadWriteLock lock = new ReentrantReadWriteLock(true); public ContainerSizeCountTask( @@ -121,7 +125,17 @@ protected synchronized void run() { private void process(ContainerInfo container, Map map) { final ContainerID id = container.containerID(); - final long currentSize = container.getUsedBytes(); + final long usedBytes = container.getUsedBytes(); + final long currentSize; + + if (usedBytes < 0) { + LOG.warn("Negative usedBytes ({}) for container {}, treating it as 0", + usedBytes, id); + currentSize = 0; + } else { + currentSize = usedBytes; + } + final Long previousSize = processedContainers.put(id, currentSize); if (previousSize != null) { decrementContainerSizeCount(previousSize, map); @@ -132,24 +146,27 @@ private void process(ContainerInfo container, /** * The process() function is responsible for updating the counts of * containers being tracked in a containerSizeCountMap based on the - * ContainerInfo objects in the list containers.It then iterates through + * ContainerInfo objects in the list containers. It then iterates through * the list of containers and does the following for each container: * - * 1) If the container is not present in processedContainers, - * it is a new container, so it is added to the processedContainers map - * and the count for its size in the containerSizeCountMap is incremented - * by 1 using the handlePutKeyEvent() function. - * 2) If the container is present in processedContainers but its size has - * been updated to the new size then the count for the old size in the - * containerSizeCountMap is decremented by 1 using the - * handleDeleteKeyEvent() function. The count for the new size is then - * incremented by 1 using the handlePutKeyEvent() function. - * 3) If the container is not present in containers list, it means the - * container has been deleted. - * The remaining containers inside the deletedContainers map are the ones - * that are not in the cluster and need to be deleted. Finally, the counts in - * the containerSizeCountMap are written to the database using the - * writeCountsToDB() function. + * 1) If the container's state is not "deleted," it will be processed: + * - If the container is not present in processedContainers, it is a new + * container. Therefore, it is added to the processedContainers map, and + * the count for its size in the containerSizeCountMap is incremented by + * 1 using the handlePutKeyEvent() function. + * - If the container is present in processedContainers but its size has + * been updated to a new size, the count for the old size in the + * containerSizeCountMap is decremented by 1 using the + * handleDeleteKeyEvent() function. Subsequently, the count for the new + * size is incremented by 1 using the handlePutKeyEvent() function. + * + * 2) If the container's state is "deleted," it is skipped, as deleted + * containers are not processed. + * + * After processing, the remaining containers inside the deletedContainers map + * are those that are not in the cluster and need to be deleted from the total + * size counts. Finally, the counts in the containerSizeCountMap are written + * to the database using the writeCountsToDB() function. */ public void process(List containers) { lock.writeLock().lock(); @@ -161,7 +178,9 @@ public void process(List containers) { // Loop to handle container create and size-update operations for (ContainerInfo container : containers) { - // The containers present in the cache hence it is not yet deleted + if (container.getState().equals(DELETED)) { + continue; // Skip deleted containers + } deletedContainers.remove(container.containerID()); // For New Container being created try { @@ -246,10 +265,10 @@ public String getTaskName() { /** * - * The handleContainerDeleteOperations() function loops through the entries - * in the deletedContainers map and calls the handleDeleteKeyEvent() function - * for each one. This will decrement the size counts of those containers by - * one which are no longer present in the cluster + * Handles the deletion of containers by updating the tracking of processed containers + * and adjusting the count of containers based on their sizes. When a container is deleted, + * it is removed from the tracking of processed containers, and the count of containers + * corresponding to its size is decremented in the container size count map. * * Used by process() * @@ -261,6 +280,9 @@ private void handleContainerDeleteOperations( Map containerSizeCountMap) { for (Map.Entry containerId : deletedContainers.entrySet()) { + // processedContainers will only keep a track of all containers that have + // been processed except DELETED containers. + processedContainers.remove(containerId.getKey()); long containerSize = deletedContainers.get(containerId.getKey()); decrementContainerSizeCount(containerSize, containerSizeCountMap); } @@ -316,19 +338,26 @@ private static void updateContainerSizeCount(long containerSize, int delta, } /** - * * The purpose of this function is to categorize containers into different * size ranges, or "bins," based on their size. * The ContainerSizeCountKey object is used to store the upper bound value * for each size range, and is later used to lookup the count of containers * in that size range within a Map. * - * Used by decrementContainerSizeCount() and incrementContainerSizeCount() + * If the container size is 0, the method sets the size of + * ContainerSizeCountKey as zero without calculating the upper bound. Used by + * decrementContainerSizeCount() and incrementContainerSizeCount() * * @param containerSize to calculate the upperSizeBound */ private static ContainerSizeCountKey getContainerSizeCountKey( long containerSize) { + // If containerSize is 0, return a ContainerSizeCountKey with size 0 + if (containerSize == 0) { + return new ContainerSizeCountKey(0L); + } + + // Otherwise, calculate the upperSizeBound return new ContainerSizeCountKey( ReconUtils.getContainerSizeUpperBound(containerSize)); } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java index 05d9927d6c93..b1ae355531b3 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestEndpoints.java @@ -866,10 +866,12 @@ public void testGetContainerCounts() throws Exception { ContainerInfo omContainerInfo1 = mock(ContainerInfo.class); given(omContainerInfo1.containerID()).willReturn(new ContainerID(1)); given(omContainerInfo1.getUsedBytes()).willReturn(1500000000L); // 1.5GB + given(omContainerInfo1.getState()).willReturn(LifeCycleState.OPEN); ContainerInfo omContainerInfo2 = mock(ContainerInfo.class); given(omContainerInfo2.containerID()).willReturn(new ContainerID(2)); given(omContainerInfo2.getUsedBytes()).willReturn(2500000000L); // 2.5GB + given(omContainerInfo2.getState()).willReturn(LifeCycleState.OPEN); // Create a list of container info objects List containers = new ArrayList<>(); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java index 371fb6f9d675..001d44d9c203 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java @@ -343,6 +343,65 @@ public void testDeletedContainer() throws Exception { .isGreaterThan(currentTime); } + @Test + public void testNegativeSizeContainers() throws Exception { + // Setup mock objects and test environment + UnhealthyContainersDao unhealthyContainersDao = + getDao(UnhealthyContainersDao.class); + ContainerHealthSchemaManager containerHealthSchemaManager = + new ContainerHealthSchemaManager( + getSchemaDefinition(ContainerSchemaDefinition.class), + unhealthyContainersDao); + ReconStorageContainerManagerFacade scmMock = + mock(ReconStorageContainerManagerFacade.class); + ContainerManager containerManagerMock = mock(ContainerManager.class); + StorageContainerServiceProvider scmClientMock = + mock(StorageContainerServiceProvider.class); + ReconContainerMetadataManager reconContainerMetadataManager = + mock(ReconContainerMetadataManager.class); + MockPlacementPolicy placementMock = new MockPlacementPolicy(); + + // Mock container info setup + List mockContainers = getMockContainers(3); + when(scmMock.getContainerManager()).thenReturn(containerManagerMock); + when(scmMock.getScmServiceProvider()).thenReturn(scmClientMock); + when(containerManagerMock.getContainers(any(ContainerID.class), + anyInt())).thenReturn(mockContainers); + for (ContainerInfo c : mockContainers) { + when(containerManagerMock.getContainer( + c.containerID())).thenReturn(c); + when(scmClientMock.getContainerWithPipeline( + c.getContainerID())).thenReturn(new ContainerWithPipeline(c, null)); + when(containerManagerMock.getContainer(c.containerID()) + .getUsedBytes()).thenReturn(Long.valueOf(-10)); + } + + // Verify the table is initially empty + assertThat(unhealthyContainersDao.findAll()).isEmpty(); + + // Setup and start the container health task + ReconTaskStatusDao reconTaskStatusDao = getDao(ReconTaskStatusDao.class); + ReconTaskConfig reconTaskConfig = new ReconTaskConfig(); + reconTaskConfig.setMissingContainerTaskInterval(Duration.ofSeconds(2)); + ContainerHealthTask containerHealthTask = new ContainerHealthTask( + scmMock.getContainerManager(), scmMock.getScmServiceProvider(), + reconTaskStatusDao, + containerHealthSchemaManager, placementMock, reconTaskConfig, + reconContainerMetadataManager, + new OzoneConfiguration()); + containerHealthTask.start(); + + // Wait for the task to identify unhealthy containers + LambdaTestUtils.await(6000, 1000, + () -> unhealthyContainersDao.count() == 3); + + // Assert that all unhealthy containers have been identified as NEGATIVE_SIZE states + List negativeSizeContainers = + unhealthyContainersDao.fetchByContainerState("NEGATIVE_SIZE"); + assertThat(negativeSizeContainers).hasSize(3); + } + + private Set getMockReplicas( long containerId, State...states) { Set replicas = new HashSet<>(); diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerSizeCountTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerSizeCountTask.java index eff330a796c9..a996f167a1bb 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerSizeCountTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestContainerSizeCountTask.java @@ -18,6 +18,11 @@ package org.apache.hadoop.ozone.recon.tasks; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING; import static org.hadoop.ozone.recon.schema.tables.ContainerCountBySizeTable.CONTAINER_COUNT_BY_SIZE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.BDDMockito.given; @@ -84,18 +89,21 @@ public void setUp() { @Test public void testProcess() { // mock a container with invalid used bytes - final ContainerInfo omContainerInfo0 = mock(ContainerInfo.class); + ContainerInfo omContainerInfo0 = mock(ContainerInfo.class); given(omContainerInfo0.containerID()).willReturn(new ContainerID(0)); given(omContainerInfo0.getUsedBytes()).willReturn(-1L); + given(omContainerInfo0.getState()).willReturn(OPEN); // Write 2 keys ContainerInfo omContainerInfo1 = mock(ContainerInfo.class); given(omContainerInfo1.containerID()).willReturn(new ContainerID(1)); given(omContainerInfo1.getUsedBytes()).willReturn(1500000000L); // 1.5GB + given(omContainerInfo1.getState()).willReturn(CLOSED); ContainerInfo omContainerInfo2 = mock(ContainerInfo.class); given(omContainerInfo2.containerID()).willReturn(new ContainerID(2)); given(omContainerInfo2.getUsedBytes()).willReturn(2500000000L); // 2.5GB + given(omContainerInfo2.getState()).willReturn(CLOSING); // mock getContainers method to return a list of containers List containers = new ArrayList<>(); @@ -105,8 +113,8 @@ public void testProcess() { task.process(containers); - // Verify 2 containers are in correct bins. - assertEquals(2, containerCountBySizeDao.count()); + // Verify 3 containers are in correct bins. + assertEquals(3, containerCountBySizeDao.count()); // container size upper bound for // 1500000000L (1.5GB) is 2147483648L = 2^31 = 2GB (next highest power of 2) @@ -124,10 +132,11 @@ public void testProcess() { containerCountBySizeDao.findById(recordToFind.value1()).getCount() .longValue()); - // Add a new key + // Add a new container ContainerInfo omContainerInfo3 = mock(ContainerInfo.class); given(omContainerInfo3.containerID()).willReturn(new ContainerID(3)); given(omContainerInfo3.getUsedBytes()).willReturn(1000000000L); // 1GB + given(omContainerInfo3.getState()).willReturn(QUASI_CLOSED); containers.add(omContainerInfo3); // Update existing key. @@ -137,7 +146,7 @@ public void testProcess() { task.process(containers); // Total size groups added to the database - assertEquals(4, containerCountBySizeDao.count()); + assertEquals(5, containerCountBySizeDao.count()); // Check whether container size upper bound for // 50000L is 536870912L = 2^29 = 512MB (next highest power of 2) @@ -164,4 +173,59 @@ public void testProcess() { .getCount() .longValue()); } + + @Test + public void testProcessDeletedAndNegativeSizedContainers() { + // Create a list of containers, including one that is deleted + ContainerInfo omContainerInfo1 = mock(ContainerInfo.class); + given(omContainerInfo1.containerID()).willReturn(new ContainerID(1)); + given(omContainerInfo1.getUsedBytes()).willReturn(1500000000L); // 1.5GB + given(omContainerInfo1.getState()).willReturn(OPEN); + + ContainerInfo omContainerInfo2 = mock(ContainerInfo.class); + given(omContainerInfo2.containerID()).willReturn(new ContainerID(2)); + given(omContainerInfo2.getUsedBytes()).willReturn(2500000000L); // 2.5GB + given(omContainerInfo2.getState()).willReturn(CLOSED); + + ContainerInfo omContainerInfoDeleted = mock(ContainerInfo.class); + given(omContainerInfoDeleted.containerID()).willReturn(new ContainerID(3)); + given(omContainerInfoDeleted.getUsedBytes()).willReturn(1000000000L); + given(omContainerInfoDeleted.getState()).willReturn(DELETED); // 1GB + + // Create a mock container with negative size + final ContainerInfo negativeSizeContainer = mock(ContainerInfo.class); + given(negativeSizeContainer.containerID()).willReturn(new ContainerID(0)); + given(negativeSizeContainer.getUsedBytes()).willReturn(-1L); + given(negativeSizeContainer.getState()).willReturn(OPEN); + + // Create a mock container with negative size and DELETE state + final ContainerInfo negativeSizeDeletedContainer = + mock(ContainerInfo.class); + given(negativeSizeDeletedContainer.containerID()).willReturn( + new ContainerID(0)); + given(negativeSizeDeletedContainer.getUsedBytes()).willReturn(-1L); + given(negativeSizeDeletedContainer.getState()).willReturn(DELETED); + + // Create a mock container with id 1 and updated size of 1GB from 1.5GB + final ContainerInfo validSizeContainer = mock(ContainerInfo.class); + given(validSizeContainer.containerID()).willReturn(new ContainerID(1)); + given(validSizeContainer.getUsedBytes()).willReturn(1000000000L); // 1GB + given(validSizeContainer.getState()).willReturn(CLOSED); + + // Mock getContainers method to return a list of containers including + // both valid and invalid ones + List containers = new ArrayList<>(); + containers.add(omContainerInfo1); + containers.add(omContainerInfo2); + containers.add(omContainerInfoDeleted); + containers.add(negativeSizeContainer); + containers.add(negativeSizeDeletedContainer); + containers.add(validSizeContainer); + + task.process(containers); + + // Verify that only the valid containers are counted + assertEquals(3, containerCountBySizeDao.count()); + } + }