package org.apache.hadoop.ozone.client.rpc;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.GroupMismatchException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.class */
public class TestWatchForCommit {
    private static MiniOzoneCluster cluster;
    private static OzoneConfiguration conf;
    private static OzoneClient client;
    private static ObjectStore objectStore;
    private static String volumeName;
    private static String bucketName;
    private static String keyString;
    private static int chunkSize;
    private static int flushSize;
    private static int maxFlushSize;
    private static int blockSize;
    private static StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocationClient;

    @Before
    public void init() throws Exception {
        conf = new OzoneConfiguration();
        chunkSize = 100;
        flushSize = 2 * chunkSize;
        maxFlushSize = 2 * flushSize;
        blockSize = 2 * maxFlushSize;
        conf.setBoolean("ozone.client.stream.buffer.flush.delay", false);
        conf.setTimeDuration("ozone.scm.pipeline.destroy.timeout", 10L, TimeUnit.SECONDS);
        conf.setQuietMode(false);
        RatisClientConfig ratisClientConfig = (RatisClientConfig) conf.getObject(RatisClientConfig.class);
        ratisClientConfig.setWriteRequestTimeoutInMs(TimeUnit.SECONDS.toMillis(10L));
        ratisClientConfig.setWatchRequestTimeoutInMs(TimeUnit.SECONDS.toMillis(10L));
        conf.setFromObject(ratisClientConfig);
        DatanodeRatisServerConfig datanodeRatisServerConfig = (DatanodeRatisServerConfig) conf.getObject(DatanodeRatisServerConfig.class);
        datanodeRatisServerConfig.setRequestTimeOut(Duration.ofSeconds(3L));
        datanodeRatisServerConfig.setWatchTimeOut(Duration.ofSeconds(3L));
        conf.setFromObject(datanodeRatisServerConfig);
        RatisClientConfig.RaftConfig raftConfig = (RatisClientConfig.RaftConfig) conf.getObject(RatisClientConfig.RaftConfig.class);
        raftConfig.setRpcRequestTimeout(TimeUnit.SECONDS.toMillis(3L));
        raftConfig.setRpcWatchRequestTimeout(TimeUnit.SECONDS.toMillis(10L));
        conf.setFromObject(raftConfig);
        conf.setTimeDuration("ozone.scm.stale.node.interval", 30L, TimeUnit.SECONDS);
        cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(9).setBlockSize(blockSize).setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize).setStreamBufferMaxSize(maxFlushSize).setStreamBufferSizeUnit(StorageUnit.BYTES).build();
        cluster.waitForClusterToBeReady();
        client = OzoneClientFactory.getRpcClient(conf);
        objectStore = client.getObjectStore();
        keyString = UUID.randomUUID().toString();
        volumeName = "watchforcommithandlingtest";
        bucketName = volumeName;
        objectStore.createVolume(volumeName);
        objectStore.getVolume(volumeName).createBucket(bucketName);
        storageContainerLocationClient = cluster.getStorageContainerLocationClient();
    }

    @After
    public void shutdown() {
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private String getKeyName() {
        return UUID.randomUUID().toString();
    }

    @Test
    public void testWatchForCommitWithKeyWrite() throws Exception {
        String keyName = getKeyName();
        OzoneOutputStream createKey = createKey(keyName, ReplicationType.RATIS, 0L);
        int i = maxFlushSize + 50;
        byte[] bytes = ContainerTestHelper.getFixedLengthString(keyString, i).getBytes(StandardCharsets.UTF_8);
        createKey.write(bytes);
        Assert.assertTrue(createKey.getOutputStream() instanceof KeyOutputStream);
        KeyOutputStream outputStream = createKey.getOutputStream();
        BlockOutputStream outputStream2 = ((BlockOutputStreamEntry) outputStream.getStreamEntries().get(0)).getOutputStream();
        Assert.assertTrue(outputStream2 instanceof BlockOutputStream);
        BlockOutputStream blockOutputStream = outputStream2;
        Assert.assertEquals(4L, blockOutputStream.getBufferPool().getSize());
        Assert.assertEquals(i, blockOutputStream.getWrittenDataLength());
        Assert.assertEquals(maxFlushSize, blockOutputStream.getTotalDataFlushedLength());
        Assert.assertTrue(blockOutputStream.getTotalAckDataLength() >= ((long) flushSize));
        Assert.assertTrue(blockOutputStream.getCommitIndex2flushedDataMap().size() <= 1);
        createKey.flush();
        Assert.assertEquals(4L, blockOutputStream.getBufferPool().getSize());
        Assert.assertEquals(i, blockOutputStream.getWrittenDataLength());
        Assert.assertEquals(i, blockOutputStream.getTotalDataFlushedLength());
        Assert.assertTrue(blockOutputStream.getCommitIndex2flushedDataMap().size() <= 2);
        XceiverClientRatis xceiverClient = blockOutputStream.getXceiverClient();
        Assert.assertEquals(3L, xceiverClient.getCommitInfoMap().size());
        Pipeline pipeline = xceiverClient.getPipeline();
        cluster.shutdownHddsDatanode((DatanodeDetails) pipeline.getNodes().get(0));
        cluster.shutdownHddsDatanode((DatanodeDetails) pipeline.getNodes().get(1));
        createKey.write(bytes);
        createKey.flush();
        Assert.assertTrue(outputStream.getRetryCount() == 0);
        createKey.close();
        Assert.assertEquals(0L, blockOutputStream.getBufferPool().computeBufferData());
        Assert.assertEquals(i, blockOutputStream.getTotalAckDataLength());
        Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
        Assert.assertEquals(i, blockOutputStream.getTotalAckDataLength());
        Assert.assertEquals(0L, blockOutputStream.getBufferPool().computeBufferData());
        Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
        validateData(keyName, bytes);
    }

    @Test
    public void testWatchForCommitForRetryfailure() throws Exception {
        XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
        ContainerWithPipeline allocateContainer = storageContainerLocationClient.allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, "ozone");
        XceiverClientSpi acquireClient = xceiverClientManager.acquireClient(allocateContainer.getPipeline());
        Assert.assertEquals(1L, acquireClient.getRefcount());
        Assert.assertEquals(allocateContainer.getPipeline(), acquireClient.getPipeline());
        Pipeline pipeline = acquireClient.getPipeline();
        TestHelper.createPipelineOnDatanode(pipeline, cluster);
        XceiverClientReply sendCommandAsync = acquireClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(allocateContainer.getContainerInfo().getContainerID(), acquireClient.getPipeline()));
        sendCommandAsync.getResponse().get();
        long logIndex = sendCommandAsync.getLogIndex();
        cluster.shutdownHddsDatanode((DatanodeDetails) pipeline.getNodes().get(0));
        cluster.shutdownHddsDatanode((DatanodeDetails) pipeline.getNodes().get(1));
        try {
            acquireClient.watchForCommit(logIndex + new Random().nextInt(100) + 10);
            Assert.fail("expected exception not thrown");
        } catch (Exception e) {
            Assert.assertTrue(e instanceof ExecutionException);
            Assert.assertFalse(HddsClientUtils.checkForException(e) instanceof TimeoutException);
        }
        xceiverClientManager.releaseClient(acquireClient, false);
    }

    @Test
    public void test2WayCommitForTimeoutException() throws Exception {
        GenericTestUtils.LogCapturer captureLogs = GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
        XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
        ContainerWithPipeline allocateContainer = storageContainerLocationClient.allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, "ozone");
        XceiverClientRatis acquireClient = xceiverClientManager.acquireClient(allocateContainer.getPipeline());
        Assert.assertEquals(1L, acquireClient.getRefcount());
        Assert.assertEquals(allocateContainer.getPipeline(), acquireClient.getPipeline());
        Pipeline pipeline = acquireClient.getPipeline();
        TestHelper.createPipelineOnDatanode(pipeline, cluster);
        XceiverClientRatis xceiverClientRatis = acquireClient;
        acquireClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(allocateContainer.getContainerInfo().getContainerID(), acquireClient.getPipeline())).getResponse().get();
        Assert.assertEquals(3L, xceiverClientRatis.getCommitInfoMap().size());
        Iterator<HddsDatanodeService> it = cluster.getHddsDatanodes().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            HddsDatanodeService next = it.next();
            if (ContainerTestHelper.isRatisFollower(next, pipeline)) {
                cluster.shutdownHddsDatanode(next.getDatanodeDetails());
                break;
            }
        }
        XceiverClientReply sendCommandAsync = acquireClient.sendCommandAsync(ContainerTestHelper.getCloseContainer(pipeline, allocateContainer.getContainerInfo().getContainerID()));
        sendCommandAsync.getResponse().get();
        acquireClient.watchForCommit(sendCommandAsync.getLogIndex());
        Assert.assertEquals(2L, xceiverClientRatis.getCommitInfoMap().size());
        xceiverClientManager.releaseClient(acquireClient, false);
        Assert.assertTrue(captureLogs.getOutput().contains("3 way commit failed"));
        Assert.assertTrue(captureLogs.getOutput().contains("TimeoutException"));
        Assert.assertTrue(captureLogs.getOutput().contains("Committed by majority"));
        captureLogs.stopCapturing();
    }

    @Test
    public void testWatchForCommitForGroupMismatchException() throws Exception {
        XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
        ContainerWithPipeline allocateContainer = storageContainerLocationClient.allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, "ozone");
        XceiverClientRatis acquireClient = xceiverClientManager.acquireClient(allocateContainer.getPipeline());
        Assert.assertEquals(1L, acquireClient.getRefcount());
        Assert.assertEquals(allocateContainer.getPipeline(), acquireClient.getPipeline());
        Pipeline pipeline = acquireClient.getPipeline();
        XceiverClientRatis xceiverClientRatis = acquireClient;
        XceiverClientReply sendCommandAsync = acquireClient.sendCommandAsync(ContainerTestHelper.getCreateContainerRequest(allocateContainer.getContainerInfo().getContainerID(), acquireClient.getPipeline()));
        sendCommandAsync.getResponse().get();
        Assert.assertEquals(3L, xceiverClientRatis.getCommitInfoMap().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(pipeline);
        TestHelper.waitForPipelineClose(arrayList, cluster);
        try {
            acquireClient.watchForCommit(sendCommandAsync.getLogIndex() + new Random().nextInt(100) + 10);
            Assert.fail("Expected exception not thrown");
        } catch (Exception e) {
            Assert.assertTrue(HddsClientUtils.checkForException(e) instanceof GroupMismatchException);
        }
        xceiverClientManager.releaseClient(acquireClient, false);
    }

    private OzoneOutputStream createKey(String str, ReplicationType replicationType, long j) throws Exception {
        return TestHelper.createKey(str, replicationType, j, objectStore, volumeName, bucketName);
    }

    private void validateData(String str, byte[] bArr) throws Exception {
        TestHelper.validateData(str, bArr, objectStore, volumeName, bucketName);
    }
}
