package org.apache.hadoop.ozone.container.replication;

import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask;
import org.apache.hadoop.ozone.container.replication.ReplicationServer;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.class */
public class TestReplicationSupervisor {
    private static final long CURRENT_TERM = 1;
    private final ContainerReplicator noopReplicator = replicationTask -> {
    };
    private final ContainerReplicator throwingReplicator = replicationTask -> {
        throw new RuntimeException("testing replication failure");
    };
    private final ContainerReplicator slowReplicator = replicationTask -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
    };
    private final AtomicReference<ContainerReplicator> replicatorRef = new AtomicReference<>();
    private ContainerSet set;
    private final ContainerLayoutVersion layout;
    private StateContext context;
    private TestClock clock;
    private DatanodeDetails datanode;

    /* loaded from: input_file:org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor$BlockingTask.class */
    private static class BlockingTask extends AbstractReplicationTask {
        private final CountDownLatch runningLatch;
        private final CountDownLatch waitForCompleteLatch;

        BlockingTask(long j, long j2, long j3, CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            super(j, j2, j3);
            this.runningLatch = countDownLatch;
            this.waitForCompleteLatch = countDownLatch2;
        }

        public void runTask() {
            this.runningLatch.countDown();
            try {
                this.waitForCompleteLatch.await();
            } catch (InterruptedException e) {
                Assertions.fail("Interrupted waiting for the completion latch to be released");
            }
            setStatus(AbstractReplicationTask.Status.DONE);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor$DiscardingExecutorService.class */
    private static class DiscardingExecutorService extends AbstractExecutorService {
        private DiscardingExecutorService() {
        }

        @Override // java.util.concurrent.ExecutorService
        public void shutdown() {
        }

        @Override // java.util.concurrent.ExecutorService
        @Nonnull
        public List<Runnable> shutdownNow() {
            return Collections.emptyList();
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean isShutdown() {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean isTerminated() {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean awaitTermination(long j, @Nonnull TimeUnit timeUnit) {
            return false;
        }

        @Override // java.util.concurrent.Executor
        public void execute(@Nonnull Runnable runnable) {
        }
    }

    /* loaded from: input_file:org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor$FakeReplicator.class */
    private class FakeReplicator implements ContainerReplicator {
        private final OzoneConfiguration conf = new OzoneConfiguration();
        private final ReplicationSupervisor supervisor;

        FakeReplicator(ReplicationSupervisor replicationSupervisor) {
            this.supervisor = replicationSupervisor;
        }

        public void replicate(ReplicationTask replicationTask) {
            if (TestReplicationSupervisor.this.set.getContainer(replicationTask.getContainerId()) != null) {
                replicationTask.setStatus(AbstractReplicationTask.Status.SKIPPED);
                return;
            }
            Assert.assertEquals(TestReplicationSupervisor.CURRENT_TERM, this.supervisor.getTotalInFlightReplications());
            try {
                TestReplicationSupervisor.this.set.addContainer(new KeyValueContainer(new KeyValueContainerData(replicationTask.getContainerId(), TestReplicationSupervisor.this.layout, 100L, UUID.randomUUID().toString(), UUID.randomUUID().toString()), this.conf));
                replicationTask.setStatus(AbstractReplicationTask.Status.DONE);
            } catch (Exception e) {
                Assert.fail("Unexpected error: " + e.getMessage());
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor$OrderedTask.class */
    private static class OrderedTask extends AbstractReplicationTask {
        private final String name;
        private final List<String> completeList;
        private final CountDownLatch completeLatch;

        OrderedTask(long j, long j2, long j3, Clock clock, StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority replicationCommandPriority, String str, List<String> list, CountDownLatch countDownLatch) {
            super(j, j2, j3, clock);
            this.completeList = list;
            this.name = str;
            this.completeLatch = countDownLatch;
            setPriority(replicationCommandPriority);
        }

        public void runTask() {
            this.completeList.add(this.name);
            setStatus(AbstractReplicationTask.Status.DONE);
            this.completeLatch.countDown();
        }
    }

    public TestReplicationSupervisor(ContainerLayoutVersion containerLayoutVersion) {
        this.layout = containerLayoutVersion;
    }

    @Parameterized.Parameters
    public static Iterable<Object[]> parameters() {
        return ContainerLayoutTestInfo.containerLayoutParameters();
    }

    @Before
    public void setUp() throws Exception {
        this.clock = new TestClock(Instant.now(), ZoneId.systemDefault());
        this.set = new ContainerSet(1000L);
        DatanodeStateMachine datanodeStateMachine = (DatanodeStateMachine) Mockito.mock(DatanodeStateMachine.class);
        this.context = new StateContext(new OzoneConfiguration(), DatanodeStateMachine.DatanodeStates.getInitState(), datanodeStateMachine, "");
        this.context.setTermOfLeaderSCM(CURRENT_TERM);
        this.datanode = MockDatanodeDetails.randomDatanodeDetails();
        Mockito.when(datanodeStateMachine.getDatanodeDetails()).thenReturn(this.datanode);
    }

    @After
    public void cleanup() {
        this.replicatorRef.set(null);
    }

    @Test
    public void normal() {
        ReplicationSupervisor supervisorWithReplicator = supervisorWithReplicator(replicationSupervisor -> {
            return new FakeReplicator(replicationSupervisor);
        });
        ReplicationSupervisorMetrics create = ReplicationSupervisorMetrics.create(supervisorWithReplicator);
        try {
            supervisorWithReplicator.addTask(createTask(CURRENT_TERM));
            supervisorWithReplicator.addTask(createTask(2L));
            supervisorWithReplicator.addTask(createTask(5L));
            Assert.assertEquals(3L, supervisorWithReplicator.getReplicationRequestCount());
            Assert.assertEquals(3L, supervisorWithReplicator.getReplicationSuccessCount());
            Assert.assertEquals(0L, supervisorWithReplicator.getReplicationFailureCount());
            Assert.assertEquals(0L, supervisorWithReplicator.getTotalInFlightReplications());
            Assert.assertEquals(0L, supervisorWithReplicator.getQueueSize());
            Assert.assertEquals(3L, this.set.containerCount());
            create.getMetrics(new MetricsCollectorImpl(), true);
            Assert.assertEquals(CURRENT_TERM, r0.getRecords().size());
            create.unRegister();
            supervisorWithReplicator.stop();
        } catch (Throwable th) {
            create.unRegister();
            supervisorWithReplicator.stop();
            throw th;
        }
    }

    @Test
    public void duplicateMessage() {
        ReplicationSupervisor supervisorWithReplicator = supervisorWithReplicator(replicationSupervisor -> {
            return new FakeReplicator(replicationSupervisor);
        });
        try {
            supervisorWithReplicator.addTask(createTask(6L));
            supervisorWithReplicator.addTask(createTask(6L));
            supervisorWithReplicator.addTask(createTask(6L));
            supervisorWithReplicator.addTask(createTask(6L));
            Assert.assertEquals(4L, supervisorWithReplicator.getReplicationRequestCount());
            Assert.assertEquals(CURRENT_TERM, supervisorWithReplicator.getReplicationSuccessCount());
            Assert.assertEquals(0L, supervisorWithReplicator.getReplicationFailureCount());
            Assert.assertEquals(3L, supervisorWithReplicator.getReplicationSkippedCount());
            Assert.assertEquals(0L, supervisorWithReplicator.getTotalInFlightReplications());
            Assert.assertEquals(0L, supervisorWithReplicator.getQueueSize());
            Assert.assertEquals(CURRENT_TERM, this.set.containerCount());
        } finally {
            supervisorWithReplicator.stop();
        }
    }

    @Test
    public void failureHandling() {
        ReplicationSupervisor supervisorWith = supervisorWith(replicationSupervisor -> {
            return this.throwingReplicator;
        }, MoreExecutors.newDirectExecutorService());
        try {
            ReplicationTask createTask = createTask(CURRENT_TERM);
            supervisorWith.addTask(createTask);
            Assert.assertEquals(CURRENT_TERM, supervisorWith.getReplicationRequestCount());
            Assert.assertEquals(0L, supervisorWith.getReplicationSuccessCount());
            Assert.assertEquals(CURRENT_TERM, supervisorWith.getReplicationFailureCount());
            Assert.assertEquals(0L, supervisorWith.getTotalInFlightReplications());
            Assert.assertEquals(0L, supervisorWith.getQueueSize());
            Assert.assertEquals(0L, this.set.containerCount());
            Assert.assertEquals(AbstractReplicationTask.Status.FAILED, createTask.getStatus());
        } finally {
            supervisorWith.stop();
        }
    }

    @Test
    public void stalledDownload() {
        ReplicationSupervisor supervisorWith = supervisorWith(replicationSupervisor -> {
            return this.noopReplicator;
        }, new DiscardingExecutorService());
        try {
            supervisorWith.addTask(createTask(CURRENT_TERM));
            supervisorWith.addTask(createTask(2L));
            supervisorWith.addTask(createTask(3L));
            supervisorWith.addTask(createECTask(4L));
            supervisorWith.addTask(createECTask(5L));
            Assert.assertEquals(0L, supervisorWith.getReplicationRequestCount());
            Assert.assertEquals(0L, supervisorWith.getReplicationSuccessCount());
            Assert.assertEquals(0L, supervisorWith.getReplicationFailureCount());
            Assert.assertEquals(5L, supervisorWith.getTotalInFlightReplications());
            Assert.assertEquals(3L, supervisorWith.getInFlightReplications(ReplicationTask.class));
            Assert.assertEquals(2L, supervisorWith.getInFlightReplications(ECReconstructionCoordinatorTask.class));
            Assert.assertEquals(0L, supervisorWith.getQueueSize());
            Assert.assertEquals(0L, this.set.containerCount());
        } finally {
            supervisorWith.stop();
        }
    }

    @Test
    public void slowDownload() {
        ReplicationSupervisor supervisorWith = supervisorWith(replicationSupervisor -> {
            return this.slowReplicator;
        }, new ThreadPoolExecutor(1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue()));
        try {
            supervisorWith.addTask(createTask(CURRENT_TERM));
            supervisorWith.addTask(createTask(2L));
            supervisorWith.addTask(createTask(3L));
            Assert.assertEquals(3L, supervisorWith.getTotalInFlightReplications());
            Assert.assertEquals(2L, supervisorWith.getQueueSize());
            try {
                Thread.sleep(4000L);
            } catch (InterruptedException e) {
            }
            Assert.assertEquals(0L, supervisorWith.getTotalInFlightReplications());
            Assert.assertEquals(0L, supervisorWith.getQueueSize());
        } finally {
            supervisorWith.stop();
        }
    }

    @Test
    public void testDownloadAndImportReplicatorFailure() throws IOException {
        OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
        ReplicationSupervisor build = ReplicationSupervisor.newBuilder().stateContext(this.context).executor(MoreExecutors.newDirectExecutorService()).clock(this.clock).build();
        SimpleContainerDownloader simpleContainerDownloader = (SimpleContainerDownloader) Mockito.mock(SimpleContainerDownloader.class);
        Mockito.when(simpleContainerDownloader.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList(), (Path) Mockito.any(Path.class), (CopyContainerCompression) Mockito.any())).thenReturn(Paths.get("file:/tmp/no-such-file", new String[0]));
        String tempPath = GenericTestUtils.getTempPath(TestReplicationSupervisor.class.getSimpleName() + "-" + UUID.randomUUID());
        MutableVolumeSet mutableVolumeSet = (MutableVolumeSet) Mockito.mock(MutableVolumeSet.class);
        Mockito.when(mutableVolumeSet.getVolumesList()).thenReturn(Collections.singletonList(new HddsVolume.Builder(tempPath).conf(ozoneConfiguration).build()));
        this.replicatorRef.set(new DownloadAndImportReplicator(ozoneConfiguration, this.set, new ContainerImporter(ozoneConfiguration, this.set, (ContainerController) Mockito.mock(ContainerController.class), mutableVolumeSet), simpleContainerDownloader));
        GenericTestUtils.LogCapturer captureLogs = GenericTestUtils.LogCapturer.captureLogs(DownloadAndImportReplicator.LOG);
        build.addTask(createTask(CURRENT_TERM));
        Assert.assertEquals(CURRENT_TERM, build.getReplicationFailureCount());
        Assert.assertEquals(0L, build.getReplicationSuccessCount());
        Assert.assertTrue(captureLogs.getOutput().contains("Container 1 replication was unsuccessful."));
    }

    @Test
    public void testTaskBeyondDeadline() {
        ReplicationSupervisor supervisorWithReplicator = supervisorWithReplicator(replicationSupervisor -> {
            return new FakeReplicator(replicationSupervisor);
        });
        ReplicateContainerCommand createCommand = createCommand(CURRENT_TERM);
        createCommand.setDeadline(this.clock.millis() + 10000);
        ReplicationTask replicationTask = new ReplicationTask(createCommand, this.replicatorRef.get());
        ReplicateContainerCommand createCommand2 = createCommand(2L);
        createCommand2.setDeadline(this.clock.millis() + 20000);
        ReplicationTask replicationTask2 = new ReplicationTask(createCommand2, this.replicatorRef.get());
        ReplicationTask replicationTask3 = new ReplicationTask(createCommand(3L), this.replicatorRef.get());
        this.clock.fastForward(15000L);
        supervisorWithReplicator.addTask(replicationTask);
        supervisorWithReplicator.addTask(replicationTask2);
        supervisorWithReplicator.addTask(replicationTask3);
        Assert.assertEquals(3L, supervisorWithReplicator.getReplicationRequestCount());
        Assert.assertEquals(2L, supervisorWithReplicator.getReplicationSuccessCount());
        Assert.assertEquals(0L, supervisorWithReplicator.getReplicationFailureCount());
        Assert.assertEquals(0L, supervisorWithReplicator.getTotalInFlightReplications());
        Assert.assertEquals(0L, supervisorWithReplicator.getQueueSize());
        Assert.assertEquals(CURRENT_TERM, supervisorWithReplicator.getReplicationTimeoutCount());
        Assert.assertEquals(2L, this.set.containerCount());
    }

    @Test
    public void testDatanodeOutOfService() {
        ReplicationSupervisor supervisorWithReplicator = supervisorWithReplicator(replicationSupervisor -> {
            return new FakeReplicator(replicationSupervisor);
        });
        this.datanode.setPersistedOpState(HddsProtos.NodeOperationalState.DECOMMISSIONING);
        ReplicateContainerCommand target = ReplicateContainerCommand.toTarget(CURRENT_TERM, MockDatanodeDetails.randomDatanodeDetails());
        target.setTerm(CURRENT_TERM);
        ReplicateContainerCommand createCommand = createCommand(2L);
        supervisorWithReplicator.addTask(new ReplicationTask(target, this.replicatorRef.get()));
        supervisorWithReplicator.addTask(new ReplicationTask(createCommand, this.replicatorRef.get()));
        Assert.assertEquals(2L, supervisorWithReplicator.getReplicationRequestCount());
        Assert.assertEquals(CURRENT_TERM, supervisorWithReplicator.getReplicationSuccessCount());
        Assert.assertEquals(0L, supervisorWithReplicator.getReplicationFailureCount());
        Assert.assertEquals(0L, supervisorWithReplicator.getTotalInFlightReplications());
        Assert.assertEquals(0L, supervisorWithReplicator.getQueueSize());
        Assert.assertEquals(0L, supervisorWithReplicator.getReplicationTimeoutCount());
        Assert.assertEquals(CURRENT_TERM, this.set.containerCount());
    }

    @Test
    public void taskWithObsoleteTermIsDropped() {
        ReplicationSupervisor supervisorWithReplicator = supervisorWithReplicator(replicationSupervisor -> {
            return new FakeReplicator(replicationSupervisor);
        });
        this.context.setTermOfLeaderSCM(2L);
        supervisorWithReplicator.addTask(createTask(CURRENT_TERM));
        Assert.assertEquals(CURRENT_TERM, supervisorWithReplicator.getReplicationRequestCount());
        Assert.assertEquals(0L, supervisorWithReplicator.getReplicationSuccessCount());
    }

    @Test
    public void testPriorityOrdering() throws InterruptedException {
        long millis = this.clock.millis() + 1000;
        ReplicationServer.ReplicationConfig replicationConfig = (ReplicationServer.ReplicationConfig) new OzoneConfiguration().getObject(ReplicationServer.ReplicationConfig.class);
        replicationConfig.setReplicationMaxStreams(1);
        ReplicationSupervisor build = ReplicationSupervisor.newBuilder().replicationConfig(replicationConfig).clock(this.clock).build();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(5);
        build.addTask(new BlockingTask(CURRENT_TERM, millis, CURRENT_TERM, countDownLatch, countDownLatch2));
        countDownLatch.await();
        ArrayList arrayList = new ArrayList();
        this.clock.fastForward(10L);
        build.addTask(new OrderedTask(CURRENT_TERM, millis, CURRENT_TERM, this.clock, StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW, "LOW_10", arrayList, countDownLatch3));
        this.clock.rewind(5L);
        build.addTask(new OrderedTask(CURRENT_TERM, millis, CURRENT_TERM, this.clock, StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW, "LOW_5", arrayList, countDownLatch3));
        build.addTask(new OrderedTask(CURRENT_TERM, millis, CURRENT_TERM, this.clock, StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL, "HIGH_5", arrayList, countDownLatch3));
        this.clock.rewind(4L);
        build.addTask(new OrderedTask(CURRENT_TERM, millis, CURRENT_TERM, this.clock, StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL, "HIGH_1", arrayList, countDownLatch3));
        this.clock.fastForward(10L);
        build.addTask(new OrderedTask(CURRENT_TERM, millis, CURRENT_TERM, this.clock, StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL, "HIGH_11", arrayList, countDownLatch3));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add("HIGH_1");
        arrayList2.add("HIGH_5");
        arrayList2.add("HIGH_11");
        arrayList2.add("LOW_5");
        arrayList2.add("LOW_10");
        Assert.assertEquals(3L, build.getInFlightReplications(OrderedTask.class));
        Assert.assertEquals(CURRENT_TERM, build.getInFlightReplications(BlockingTask.class));
        countDownLatch2.countDown();
        countDownLatch3.await();
        Assert.assertEquals(arrayList2, arrayList);
        Assert.assertEquals(0L, build.getInFlightReplications(OrderedTask.class));
        Assert.assertEquals(0L, build.getInFlightReplications(BlockingTask.class));
    }

    private ReplicationSupervisor supervisorWithReplicator(Function<ReplicationSupervisor, ContainerReplicator> function) {
        return supervisorWith(function, MoreExecutors.newDirectExecutorService());
    }

    private ReplicationSupervisor supervisorWith(Function<ReplicationSupervisor, ContainerReplicator> function, ExecutorService executorService) {
        ReplicationSupervisor build = ReplicationSupervisor.newBuilder().stateContext(this.context).replicationConfig((ReplicationServer.ReplicationConfig) new OzoneConfiguration().getObject(ReplicationServer.ReplicationConfig.class)).executor(executorService).clock(this.clock).build();
        this.replicatorRef.set(function.apply(build));
        return build;
    }

    private ReplicationTask createTask(long j) {
        return new ReplicationTask(createCommand(j), this.replicatorRef.get());
    }

    private ECReconstructionCoordinatorTask createECTask(long j) {
        return new ECReconstructionCoordinatorTask((ECReconstructionCoordinator) null, createReconstructionCmd(j));
    }

    private static ReplicateContainerCommand createCommand(long j) {
        ReplicateContainerCommand forTest = ReplicateContainerCommand.forTest(j);
        forTest.setTerm(CURRENT_TERM);
        return forTest;
    }

    private static ECReconstructionCommandInfo createReconstructionCmd(long j) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(MockDatanodeDetails.randomDatanodeDetails(), 1));
        arrayList.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(MockDatanodeDetails.randomDatanodeDetails(), 2));
        arrayList.add(new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(MockDatanodeDetails.randomDatanodeDetails(), 3));
        return new ECReconstructionCommandInfo(new ReconstructECContainersCommand(j, arrayList, Collections.singletonList(MockDatanodeDetails.randomDatanodeDetails()), new byte[]{4}, new ECReplicationConfig(3, 2)));
    }

    @Test
    public void poolSizeCanBeIncreased() {
        this.datanode.setPersistedOpState(HddsProtos.NodeOperationalState.IN_SERVICE);
        ReplicationSupervisor build = ReplicationSupervisor.newBuilder().stateContext(this.context).build();
        try {
            build.nodeStateUpdated(HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE);
        } finally {
            build.stop();
        }
    }

    @Test
    public void poolSizeCanBeDecreased() {
        this.datanode.setPersistedOpState(HddsProtos.NodeOperationalState.IN_MAINTENANCE);
        ReplicationSupervisor build = ReplicationSupervisor.newBuilder().stateContext(this.context).build();
        try {
            build.nodeStateUpdated(HddsProtos.NodeOperationalState.IN_SERVICE);
        } finally {
            build.stop();
        }
    }

    @Test
    public void testMaxQueueSize() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(MockDatanodeDetails.randomDatanodeDetails());
        arrayList.add(MockDatanodeDetails.randomDatanodeDetails());
        DatanodeConfiguration datanodeConfiguration = new DatanodeConfiguration();
        datanodeConfiguration.setCommandQueueLimit(2);
        ReplicationServer.ReplicationConfig replicationConfig = new ReplicationServer.ReplicationConfig();
        replicationConfig.setReplicationMaxStreams(5);
        AtomicInteger atomicInteger = new AtomicInteger();
        ReplicationSupervisor.Builder executor = ReplicationSupervisor.newBuilder().executor(new DiscardingExecutorService());
        atomicInteger.getClass();
        ReplicationSupervisor build = executor.executorThreadUpdater(atomicInteger::set).datanodeConfig(datanodeConfiguration).replicationConfig(replicationConfig).build();
        scheduleTasks(arrayList, build);
        Assert.assertEquals(2L, build.getTotalInFlightReplications());
        build.nodeStateUpdated(HddsProtos.NodeOperationalState.DECOMMISSIONING);
        Assert.assertEquals(4L, build.getMaxQueueSize());
        Assert.assertEquals(10L, atomicInteger.get());
        scheduleTasks(arrayList, build);
        Assert.assertEquals(4L, build.getTotalInFlightReplications());
        build.nodeStateUpdated(HddsProtos.NodeOperationalState.IN_SERVICE);
        Assert.assertEquals(2L, build.getMaxQueueSize());
        Assert.assertEquals(5L, atomicInteger.get());
    }

    private void scheduleTasks(List<DatanodeDetails> list, ReplicationSupervisor replicationSupervisor) {
        for (int i = 0; i < 10; i++) {
            replicationSupervisor.addTask(new ReplicationTask(ReplicateContainerCommand.fromSources(i, Collections.singletonList(list.get(i % list.size()))), this.noopReplicator));
        }
    }
}
