package org.apache.ratis;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
import org.apache.ratis.util.function.CheckedSupplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

/* JADX WARN: Classes with same name are omitted:
  input_file:ratis-server-2.5.1-tests.jar:org/apache/ratis/WatchRequestTests.class
 */
/* loaded from: input_file:test-classes/org/apache/ratis/WatchRequestTests.class */
public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster> extends BaseTest implements MiniRaftCluster.Factory.Get<CLUSTER> {
    static final int NUM_SERVERS = 3;
    static final int GET_TIMEOUT_SECOND = 10;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:ratis-server-2.5.1-tests.jar:org/apache/ratis/WatchRequestTests$TestParameters.class
     */
    /* loaded from: input_file:test-classes/org/apache/ratis/WatchRequestTests$TestParameters.class */
    public static class TestParameters {
        final int numMessages;
        final RaftClient writeClient;
        final MiniRaftCluster cluster;
        final Logger log;

        TestParameters(int i, RaftClient raftClient, MiniRaftCluster miniRaftCluster, Logger logger) {
            this.numMessages = i;
            this.writeClient = raftClient;
            this.cluster = miniRaftCluster;
            this.log = logger;
        }

        void sendRequests(List<CompletableFuture<RaftClientReply>> list, List<CompletableFuture<WatchReplies>> list2) {
            for (int i = 0; i < this.numMessages; i++) {
                String str = "m" + i;
                this.log.info("SEND_REQUEST {}: message={}", Integer.valueOf(i), str);
                CompletableFuture<RaftClientReply> send = this.writeClient.async().send(new RaftTestUtil.SimpleMessage(str));
                list.add(send);
                CompletableFuture<WatchReplies> completableFuture = new CompletableFuture<>();
                list2.add(completableFuture);
                send.thenAccept(raftClientReply -> {
                    long logIndex = raftClientReply.getLogIndex();
                    this.log.info("SEND_WATCH: message={}, logIndex={}", str, Long.valueOf(logIndex));
                    completableFuture.complete(new WatchReplies(logIndex, this.writeClient.async().watch(logIndex, RaftProtos.ReplicationLevel.MAJORITY), this.writeClient.async().watch(logIndex, RaftProtos.ReplicationLevel.ALL), this.writeClient.async().watch(logIndex, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED), this.writeClient.async().watch(logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED), this.log));
                });
            }
        }

        CompletableFuture<RaftClientReply> sendWatchRequest(long j, RetryPolicy retryPolicy) throws Exception {
            RaftClient createClient = this.cluster.createClient(RaftTestUtil.waitForLeader(this.cluster).getId(), retryPolicy);
            Throwable th = null;
            try {
                try {
                    long logIndex = ((RaftClientReply) createClient.async().send(new RaftTestUtil.SimpleMessage("message")).get()).getLogIndex();
                    Assert.assertTrue(logIndex > 0);
                    createClient.async().watch(logIndex, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED);
                    CompletableFuture<RaftClientReply> watch = createClient.async().watch(j, RaftProtos.ReplicationLevel.MAJORITY);
                    if (createClient != null) {
                        if (0 != 0) {
                            try {
                                createClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createClient.close();
                        }
                    }
                    return watch;
                } finally {
                }
            } catch (Throwable th3) {
                if (createClient != null) {
                    if (th != null) {
                        try {
                            createClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createClient.close();
                    }
                }
                throw th3;
            }
        }

        public String toString() {
            return "numMessages=" + this.numMessages;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:ratis-server-2.5.1-tests.jar:org/apache/ratis/WatchRequestTests$WatchReplies.class
     */
    /* loaded from: input_file:test-classes/org/apache/ratis/WatchRequestTests$WatchReplies.class */
    public static class WatchReplies {
        private final long logIndex;
        private final CompletableFuture<RaftClientReply> majority;
        private final CompletableFuture<RaftClientReply> all;
        private final CompletableFuture<RaftClientReply> majorityCommitted;
        private final CompletableFuture<RaftClientReply> allCommitted;
        private final Logger log;

        WatchReplies(long j, CompletableFuture<RaftClientReply> completableFuture, CompletableFuture<RaftClientReply> completableFuture2, CompletableFuture<RaftClientReply> completableFuture3, CompletableFuture<RaftClientReply> completableFuture4, Logger logger) {
            this.logIndex = j;
            this.majority = completableFuture;
            this.all = completableFuture2;
            this.majorityCommitted = completableFuture3;
            this.allCommitted = completableFuture4;
            this.log = logger;
        }

        RaftClientReply getMajority() throws Exception {
            return get(this.majority, "majority");
        }

        RaftClientReply getMajorityCommitted() throws Exception {
            return get(this.majorityCommitted, "majorityCommitted");
        }

        RaftClientReply getAll() throws Exception {
            return get(this.all, "all");
        }

        RaftClientReply getAllCommitted() throws Exception {
            return get(this.allCommitted, "allCommitted");
        }

        RaftClientReply get(CompletableFuture<RaftClientReply> completableFuture, String str) throws Exception {
            try {
                RaftClientReply raftClientReply = completableFuture.get(10L, TimeUnit.SECONDS);
                this.log.info("{}-Watch({}) returns {}", new Object[]{str, Long.valueOf(this.logIndex), raftClientReply});
                Assert.assertTrue(raftClientReply.isSuccess());
                Assert.assertTrue(raftClientReply.getLogIndex() >= this.logIndex);
                return raftClientReply;
            } catch (Exception e) {
                this.log.error("Failed to get {}({})", str, Long.valueOf(this.logIndex));
                throw e;
            }
        }
    }

    public WatchRequestTests() {
        RaftServerTestUtil.setWatchRequestsLogLevel(Level.DEBUG);
        Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
    }

    @Before
    public void setup() {
        getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
    }

    @Test
    public void testWatchRequestAsync() throws Exception {
        runWithNewCluster(3, miniRaftCluster -> {
            runTest(WatchRequestTests::runTestWatchRequestAsync, miniRaftCluster, this.LOG);
        });
    }

    static void runTest(CheckedConsumer<TestParameters, Exception> checkedConsumer, MiniRaftCluster miniRaftCluster, Logger logger) throws Exception {
        RaftClient createClient = miniRaftCluster.createClient(RaftTestUtil.waitForLeader(miniRaftCluster).getId());
        Throwable th = null;
        try {
            try {
                for (int i : new int[]{1, GET_TIMEOUT_SECOND, 20}) {
                    TestParameters testParameters = new TestParameters(i, createClient, miniRaftCluster, logger);
                    logger.info("{}) {}, {}", new Object[]{Integer.valueOf(i), testParameters, miniRaftCluster.printServers()});
                    checkedConsumer.accept(testParameters);
                }
                if (createClient != null) {
                    if (0 == 0) {
                        createClient.close();
                        return;
                    }
                    try {
                        createClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createClient != null) {
                if (th != null) {
                    try {
                        createClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th4;
        }
    }

    static void runSingleTest(CheckedConsumer<TestParameters, Exception> checkedConsumer, MiniRaftCluster miniRaftCluster, Logger logger) throws Exception {
        RaftClient createClient = miniRaftCluster.createClient(RaftTestUtil.waitForLeader(miniRaftCluster).getId());
        Throwable th = null;
        try {
            for (int i : new int[]{1}) {
                TestParameters testParameters = new TestParameters(i, createClient, miniRaftCluster, logger);
                logger.info("{}) {}, {}", new Object[]{Integer.valueOf(i), testParameters, miniRaftCluster.printServers()});
                checkedConsumer.accept(testParameters);
            }
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    static void runTestWatchRequestAsync(TestParameters testParameters) throws Exception {
        Logger logger = testParameters.log;
        MiniRaftCluster miniRaftCluster = testParameters.cluster;
        int i = testParameters.numMessages;
        RaftServer.Division leader = miniRaftCluster.getLeader();
        logger.info("block leader {}", leader.getId());
        SimpleStateMachine4Testing.get(leader).blockStartTransaction();
        List<RaftServer.Division> followers = miniRaftCluster.getFollowers();
        RaftServer.Division division = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
        logger.info("block follower {}", division.getId());
        SimpleStateMachine4Testing.get(division).blockFlushStateMachineData();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        testParameters.sendRequests(arrayList, arrayList2);
        Assert.assertEquals(i, arrayList.size());
        Assert.assertEquals(i, arrayList2.size());
        TimeUnit.SECONDS.sleep(1L);
        assertNotDone(arrayList);
        assertNotDone(arrayList2);
        SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
        logger.info("unblock leader {}", leader.getId());
        checkMajority(arrayList, arrayList2, logger);
        Assert.assertEquals(i, arrayList2.size());
        TimeUnit.SECONDS.sleep(1L);
        assertNotDone(arrayList2.stream().map((v0) -> {
            return v0.join();
        }).map(watchReplies -> {
            return watchReplies.all;
        }));
        assertNotDone(arrayList2.stream().map((v0) -> {
            return v0.join();
        }).map(watchReplies2 -> {
            return watchReplies2.allCommitted;
        }));
        logger.info("unblock follower {}", division.getId());
        SimpleStateMachine4Testing.get(division).unblockFlushStateMachineData();
        checkAll(arrayList2, logger);
    }

    static void checkMajority(List<CompletableFuture<RaftClientReply>> list, List<CompletableFuture<WatchReplies>> list2, Logger logger) throws Exception {
        for (int i = 0; i < list.size(); i++) {
            RaftClientReply raftClientReply = list.get(i).get(10L, TimeUnit.SECONDS);
            logger.info("checkMajority {}: receive {}", Integer.valueOf(i), raftClientReply);
            long logIndex = raftClientReply.getLogIndex();
            Assert.assertTrue(raftClientReply.isSuccess());
            WatchReplies watchReplies = list2.get(i).get(10L, TimeUnit.SECONDS);
            Assert.assertEquals(logIndex, watchReplies.logIndex);
            watchReplies.getMajority();
            Collection commitInfos = watchReplies.getMajorityCommitted().getCommitInfos();
            String str = "logIndex=" + logIndex + ", " + ProtoUtils.toString(commitInfos);
            Assert.assertEquals(3L, commitInfos.size());
            Assert.assertTrue(str, logIndex > ((Long) commitInfos.stream().map((v0) -> {
                return v0.getCommitIndex();
            }).min((v0, v1) -> {
                return Long.compare(v0, v1);
            }).get()).longValue());
            commitInfos.stream().map((v0) -> {
                return v0.getCommitIndex();
            }).sorted((v0, v1) -> {
                return Long.compare(v0, v1);
            }).skip(1L).forEach(l -> {
                Assert.assertTrue(str, logIndex <= l.longValue());
            });
        }
    }

    static void checkAll(List<CompletableFuture<WatchReplies>> list, Logger logger) throws Exception {
        for (int i = 0; i < list.size(); i++) {
            WatchReplies watchReplies = list.get(i).get(10L, TimeUnit.SECONDS);
            long j = watchReplies.logIndex;
            logger.info("checkAll {}: logIndex={}", Integer.valueOf(i), Long.valueOf(j));
            watchReplies.getAll();
            Collection commitInfos = watchReplies.getAllCommitted().getCommitInfos();
            String str = "logIndex=" + j + ", " + ProtoUtils.toString(commitInfos);
            Assert.assertEquals(3L, commitInfos.size());
            commitInfos.forEach(commitInfoProto -> {
                Assert.assertTrue(str, j <= commitInfoProto.getCommitIndex());
            });
        }
    }

    static <T> void assertNotDone(List<CompletableFuture<T>> list) {
        assertNotDone(list.stream());
    }

    static <T> void assertNotDone(Stream<CompletableFuture<T>> stream) {
        stream.forEach(completableFuture -> {
            if (completableFuture.isDone()) {
                try {
                    Assert.fail("Done unexpectedly: " + completableFuture.get());
                } catch (Exception e) {
                    Assert.fail("Done unexpectedly and failed to get: " + e);
                }
            }
        });
    }

    @Test
    public void testWatchRequestAsyncChangeLeader() throws Exception {
        runWithNewCluster(3, miniRaftCluster -> {
            runTest(WatchRequestTests::runTestWatchRequestAsyncChangeLeader, miniRaftCluster, this.LOG);
        });
    }

    static void runTestWatchRequestAsyncChangeLeader(TestParameters testParameters) throws Exception {
        Logger logger = testParameters.log;
        MiniRaftCluster miniRaftCluster = testParameters.cluster;
        int i = testParameters.numMessages;
        List<RaftServer.Division> followers = miniRaftCluster.getFollowers();
        RaftServer.Division division = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
        logger.info("block follower {}", division.getId());
        SimpleStateMachine4Testing.get(division).blockFlushStateMachineData();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        testParameters.sendRequests(arrayList, arrayList2);
        Assert.assertEquals(i, arrayList.size());
        Assert.assertEquals(i, arrayList2.size());
        checkMajority(arrayList, arrayList2, logger);
        TimeUnit.SECONDS.sleep(1L);
        assertNotDone(arrayList2.stream().map((v0) -> {
            return v0.join();
        }).map(watchReplies -> {
            return watchReplies.allCommitted;
        }));
        RaftTestUtil.changeLeader(miniRaftCluster, miniRaftCluster.getLeader().getId());
        SimpleStateMachine4Testing.get(division).unblockFlushStateMachineData();
        logger.info("unblock follower {}", division.getId());
        checkAll(arrayList2, logger);
    }

    @Test
    public void testWatchRequestTimeout() throws Exception {
        RaftProperties properties = getProperties();
        RaftServerConfigKeys.Watch.setTimeout(properties, TimeDuration.valueOf(500L, TimeUnit.MILLISECONDS));
        RaftServerConfigKeys.Watch.setTimeoutDenomination(properties, TimeDuration.valueOf(100L, TimeUnit.MILLISECONDS));
        try {
            runWithNewCluster(3, miniRaftCluster -> {
                runTest(WatchRequestTests::runTestWatchRequestTimeout, miniRaftCluster, this.LOG);
            });
        } finally {
            RaftServerConfigKeys.Watch.setTimeout(properties, RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
            RaftServerConfigKeys.Watch.setTimeoutDenomination(properties, RaftServerConfigKeys.Watch.TIMEOUT_DENOMINATION_DEFAULT);
        }
    }

    static void runTestWatchRequestTimeout(TestParameters testParameters) throws Exception {
        Logger logger = testParameters.log;
        MiniRaftCluster miniRaftCluster = testParameters.cluster;
        int i = testParameters.numMessages;
        RaftProperties properties = miniRaftCluster.getProperties();
        TimeDuration timeout = RaftServerConfigKeys.Watch.timeout(properties);
        TimeDuration timeoutDenomination = RaftServerConfigKeys.Watch.timeoutDenomination(properties);
        RaftServer.Division leader = miniRaftCluster.getLeader();
        logger.info("block leader {}", leader.getId());
        SimpleStateMachine4Testing.get(leader).blockStartTransaction();
        List<RaftServer.Division> followers = miniRaftCluster.getFollowers();
        RaftServer.Division division = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
        logger.info("block follower {}", division.getId());
        SimpleStateMachine4Testing.get(division).blockFlushStateMachineData();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        testParameters.sendRequests(arrayList, arrayList2);
        Assert.assertEquals(i, arrayList.size());
        Assert.assertEquals(i, arrayList2.size());
        timeout.sleep();
        timeoutDenomination.sleep();
        assertNotDone(arrayList);
        assertNotDone(arrayList2);
        SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
        logger.info("unblock leader {}", leader.getId());
        checkMajority(arrayList, arrayList2, logger);
        checkTimeout(arrayList, arrayList2, logger);
        SimpleStateMachine4Testing.get(division).unblockFlushStateMachineData();
        logger.info("unblock follower {}", division.getId());
    }

    @Test
    public void testWatchRequestClientTimeout() throws Exception {
        RaftProperties properties = getProperties();
        RaftServerConfigKeys.Watch.setTimeout(properties, TimeDuration.valueOf(100L, TimeUnit.SECONDS));
        RaftClientConfigKeys.Rpc.setWatchRequestTimeout(properties, TimeDuration.valueOf(15L, TimeUnit.SECONDS));
        try {
            runWithNewCluster(3, miniRaftCluster -> {
                runSingleTest(WatchRequestTests::runTestWatchRequestClientTimeout, miniRaftCluster, this.LOG);
            });
        } finally {
            RaftServerConfigKeys.Watch.setTimeout(properties, RaftServerConfigKeys.Watch.TIMEOUT_DEFAULT);
            RaftClientConfigKeys.Rpc.setWatchRequestTimeout(properties, RaftClientConfigKeys.Rpc.WATCH_REQUEST_TIMEOUT_DEFAULT);
        }
    }

    static void runTestWatchRequestClientTimeout(TestParameters testParameters) throws Exception {
        Logger logger = testParameters.log;
        try {
            testParameters.sendWatchRequest(1000L, RetryPolicies.noRetry()).get();
            Assert.fail("runTestWatchRequestClientTimeout failed");
        } catch (Exception e) {
            logger.error("error occurred", e);
            Assert.assertTrue(e.getCause().getClass() == AlreadyClosedException.class || e.getCause().getClass() == RaftRetryFailureException.class);
            if (e.getCause() == null || e.getCause().getCause() == null) {
                return;
            }
            Assert.assertEquals(TimeoutIOException.class, e.getCause().getCause().getClass());
        }
    }

    static void checkTimeout(List<CompletableFuture<RaftClientReply>> list, List<CompletableFuture<WatchReplies>> list2, Logger logger) throws Exception {
        for (int i = 0; i < list.size(); i++) {
            RaftClientReply raftClientReply = list.get(i).get(10L, TimeUnit.SECONDS);
            logger.info("checkTimeout {}: receive {}", Integer.valueOf(i), raftClientReply);
            long logIndex = raftClientReply.getLogIndex();
            Assert.assertTrue(raftClientReply.isSuccess());
            WatchReplies watchReplies = list2.get(i).get(10L, TimeUnit.SECONDS);
            Assert.assertEquals(logIndex, watchReplies.logIndex);
            RaftProtos.ReplicationLevel replicationLevel = RaftProtos.ReplicationLevel.ALL;
            watchReplies.getClass();
            assertNotReplicatedException(logIndex, replicationLevel, (CheckedSupplier<RaftClientReply, Exception>) watchReplies::getAll);
            RaftProtos.ReplicationLevel replicationLevel2 = RaftProtos.ReplicationLevel.ALL_COMMITTED;
            watchReplies.getClass();
            assertNotReplicatedException(logIndex, replicationLevel2, (CheckedSupplier<RaftClientReply, Exception>) watchReplies::getAllCommitted);
        }
    }

    static void assertNotReplicatedException(long j, RaftProtos.ReplicationLevel replicationLevel, CheckedSupplier<RaftClientReply, Exception> checkedSupplier) throws Exception {
        try {
            checkedSupplier.get();
            Assert.fail();
        } catch (ExecutionException e) {
            assertNotReplicatedException(j, replicationLevel, e.getCause());
        }
    }

    static void assertNotReplicatedException(long j, RaftProtos.ReplicationLevel replicationLevel, Throwable th) {
        Assert.assertSame(NotReplicatedException.class, th.getClass());
        NotReplicatedException notReplicatedException = (NotReplicatedException) th;
        Assert.assertNotNull(notReplicatedException);
        Assert.assertEquals(j, notReplicatedException.getLogIndex());
        Assert.assertEquals(replicationLevel, notReplicatedException.getRequiredReplication());
    }
}
