package org.apache.ratis.grpc;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RetryCacheTests;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/ratis/grpc/TestRetryCacheWithGrpc.class */
public class TestRetryCacheWithGrpc extends RetryCacheTests<MiniRaftClusterWithGrpc> implements MiniRaftClusterWithGrpc.FactoryGet {

    /* loaded from: input_file:org/apache/ratis/grpc/TestRetryCacheWithGrpc$InvalidateRepliedCallsTest.class */
    class InvalidateRepliedCallsTest {
        private final MiniRaftCluster cluster;
        private final RaftServer.Division leader;
        private final AtomicInteger count = new AtomicInteger();

        InvalidateRepliedCallsTest(MiniRaftCluster miniRaftCluster) throws Exception {
            this.cluster = miniRaftCluster;
            this.leader = RaftTestUtil.waitForLeader(miniRaftCluster);
        }

        RaftTestUtil.SimpleMessage nextMessage() {
            return new RaftTestUtil.SimpleMessage("m" + this.count.incrementAndGet());
        }

        void assertRetryCacheEntry(RaftClient raftClient, long j, boolean z) throws InterruptedException {
            assertRetryCacheEntry(raftClient, j, z, false);
        }

        void assertRetryCacheEntry(RaftClient raftClient, long j, boolean z, boolean z2) throws InterruptedException {
            Supplier supplier = () -> {
                return RetryCacheTestUtil.get(this.leader, raftClient.getId(), j);
            };
            Consumer consumer = z ? (v0) -> {
                Assert.assertNotNull(v0);
            } : (v0) -> {
                Assert.assertNull(v0);
            };
            if (z2) {
                JavaUtils.attempt(() -> {
                    consumer.accept(supplier.get());
                }, 100, TimeDuration.ONE_MILLISECOND, "retry cache entry", (Logger) null);
            } else {
                consumer.accept(supplier.get());
            }
        }

        long send(RaftClient raftClient, Long l) throws Exception {
            long assertReply = TestRetryCacheWithGrpc.assertReply(raftClient.io().send(nextMessage()));
            if (l != null) {
                assertRetryCacheEntry(raftClient, l.longValue(), false);
            }
            assertRetryCacheEntry(raftClient, assertReply, true);
            return assertReply;
        }

        CompletableFuture<Long> sendAsync(RaftClient raftClient) {
            return raftClient.async().send(nextMessage()).thenApply(TestRetryCacheWithGrpc::assertReply);
        }

        CompletableFuture<Long> watch(long j, RaftClient raftClient) {
            return raftClient.async().watch(j, RaftProtos.ReplicationLevel.MAJORITY).thenApply(TestRetryCacheWithGrpc::assertReply);
        }

        void run() throws Exception {
            RaftClient createClient = this.cluster.createClient();
            Throwable th = null;
            try {
                Long l = null;
                for (int i = 0; i < 5; i++) {
                    l = Long.valueOf(send(createClient, l));
                }
                long longValue = l.longValue();
                SimpleStateMachine4Testing simpleStateMachine4Testing = SimpleStateMachine4Testing.get(this.leader);
                simpleStateMachine4Testing.blockApplyTransaction();
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < 5; i2++) {
                    arrayList.add(sendAsync(createClient));
                }
                assertRetryCacheEntry(createClient, longValue, false, true);
                BaseTest.ONE_SECOND.sleep();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    Assert.assertFalse(((CompletableFuture) it.next()).isDone());
                }
                simpleStateMachine4Testing.unblockApplyTransaction();
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    assertRetryCacheEntry(createClient, ((Long) ((CompletableFuture) it2.next()).join()).longValue(), true);
                }
                long send = send(createClient, null);
                TestRetryCacheWithGrpc.this.LOG.info("oneMoreBlockingCall callId={}", Long.valueOf(send));
                assertRetryCacheEntry(createClient, send, true);
                Iterator it3 = arrayList.iterator();
                while (it3.hasNext()) {
                    assertRetryCacheEntry(createClient, ((Long) ((CompletableFuture) it3.next()).join()).longValue(), false);
                }
                long longValue2 = watch(1L, createClient).get().longValue();
                TestRetryCacheWithGrpc.this.LOG.info("watchAsyncCall callId={}", Long.valueOf(longValue2));
                assertRetryCacheEntry(createClient, send, false);
                assertRetryCacheEntry(createClient, longValue2, false);
                if (createClient != null) {
                    if (0 == 0) {
                        createClient.close();
                        return;
                    }
                    try {
                        createClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        createClient.close();
                    }
                }
                throw th3;
            }
        }
    }

    public TestRetryCacheWithGrpc() {
        Slf4jUtils.setLogLevel(RetryCache.LOG, Level.TRACE);
        getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
    }

    @Test
    public void testInvalidateRepliedCalls() throws Exception {
        runWithNewCluster(3, miniRaftClusterWithGrpc -> {
            new InvalidateRepliedCallsTest(miniRaftClusterWithGrpc).run();
        });
    }

    static long assertReply(RaftClientReply raftClientReply) {
        Assert.assertTrue(raftClientReply.isSuccess());
        return raftClientReply.getCallId();
    }

    /* JADX WARN: Type inference failed for: r3v1, types: [org.apache.ratis.protocol.RaftPeerId, long] */
    @Test(timeout = 10000)
    public void testRetryOnResourceUnavailableException() throws InterruptedException, IOException {
        RaftProperties raftProperties = new RaftProperties();
        raftProperties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
        RaftServerConfigKeys.Write.setElementLimit(raftProperties, 1);
        MiniRaftClusterWithGrpc newCluster = getFactory().newCluster(3, raftProperties);
        newCluster.start();
        RaftServer.Division waitForLeader = RaftTestUtil.waitForLeader(newCluster);
        RaftServer raftServer = waitForLeader.getRaftServer();
        Iterator it = newCluster.getFollowers().iterator();
        while (it.hasNext()) {
            ((RaftServer.Division) it.next()).getStateMachine().blockWriteStateMachineData();
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        long j = 1;
        ClientId randomId = ClientId.randomId();
        RaftClientRequest raftClientRequest = null;
        while (!atomicBoolean.get()) {
            long j2 = j;
            raftServer.getId();
            ?? r3 = j;
            j = r3 + 1;
            raftClientRequest = newCluster.newRaftClientRequest(randomId, (RaftPeerId) r3, (long) r3, new RaftTestUtil.SimpleMessage("message"));
            raftServer.submitClientRequestAsync(raftClientRequest).exceptionally(th -> {
                if (!(th.getCause() instanceof ResourceUnavailableException)) {
                    return null;
                }
                RetryCacheTestUtil.isFailed(RetryCacheTestUtil.get(waitForLeader, randomId, j2));
                atomicBoolean.set(true);
                return null;
            });
        }
        Iterator it2 = newCluster.getFollowers().iterator();
        while (it2.hasNext()) {
            ((RaftServer.Division) it2.next()).getStateMachine().unblockWriteStateMachineData();
        }
        while (atomicBoolean.get()) {
            try {
                if (((RaftClientReply) raftServer.submitClientRequestAsync(raftClientRequest).get()).isSuccess()) {
                    atomicBoolean.set(false);
                }
            } catch (Exception e) {
            }
        }
        newCluster.shutdown();
    }
}
