package org.apache.ratis.datastream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.datastream.DataStreamTestUtils;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/ratis/datastream/DataStreamAsyncClusterTests.class */
public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluster> extends DataStreamClusterTests<CLUSTER> {
    final Executor executor = Executors.newFixedThreadPool(16);

    @Test
    public void testMultipleStreamsSingleServer() throws Exception {
        runWithNewCluster(1, this::runTestDataStream);
    }

    @Test
    public void testMultipleStreamsMultipleServers() throws Exception {
        TimeDuration timeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(getProperties());
        RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(), TimeDuration.valueOf(2L, TimeUnit.SECONDS));
        TimeDuration timeoutMax = RaftServerConfigKeys.Rpc.timeoutMax(getProperties());
        RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), TimeDuration.valueOf(3L, TimeUnit.SECONDS));
        runWithNewCluster(3, this::runTestDataStream);
        RaftServerConfigKeys.Rpc.setTimeoutMin(getProperties(), timeoutMin);
        RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), timeoutMax);
    }

    @Test
    public void testMultipleStreamsMultipleServersStepDownLeader() throws Exception {
        runWithNewCluster(3, this::runTestDataStreamStepDownLeader);
    }

    void runTestDataStreamStepDownLeader(CLUSTER cluster) throws Exception {
        runTestDataStream(cluster, true);
    }

    void runTestDataStream(CLUSTER cluster) throws Exception {
        runTestDataStream(cluster, false);
    }

    void runTestDataStream(CLUSTER cluster, boolean z) throws Exception {
        RaftTestUtil.waitForLeader(cluster);
        ArrayList arrayList = new ArrayList();
        arrayList.add(CompletableFuture.supplyAsync(() -> {
            return runTestDataStream(cluster, 5, 10, 1000000, 10, z);
        }, this.executor));
        arrayList.add(CompletableFuture.supplyAsync(() -> {
            return runTestDataStream(cluster, 2, 20, 1000, 10000, z);
        }, this.executor));
        long longValue = ((Long) arrayList.stream().map((v0) -> {
            return v0.join();
        }).max((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElseThrow(IllegalStateException::new)).longValue();
        if (z) {
            RaftPeerId id = cluster.getLeader().getId();
            this.LOG.info("Changed leader from {} to {}", id, ((CompletableFuture) arrayList.get(0)).thenApplyAsync(l -> {
                try {
                    return RaftTestUtil.changeLeader(cluster, id);
                } catch (Exception e) {
                    throw new CompletionException("Failed to change leader from " + id, e);
                }
            }).join());
        }
        RaftClient createClient = cluster.createClient();
        Throwable th = null;
        try {
            try {
                Assert.assertTrue(((RaftClientReply) createClient.async().watch(longValue, RaftProtos.ReplicationLevel.ALL).join()).isSuccess());
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createClient.close();
                    }
                }
                Iterator it = cluster.getServers().iterator();
                while (it.hasNext()) {
                    RaftServer.Division division = ((RaftServer) it.next()).getDivision(cluster.getGroupId());
                    for (DataStreamTestUtils.SingleDataStream singleDataStream : division.getStateMachine().getStreams()) {
                        Assert.assertFalse(singleDataStream.m3getDataChannel().isOpen());
                        DataStreamTestUtils.assertLogEntry(division, singleDataStream);
                    }
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (th != null) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    Long runTestDataStream(CLUSTER cluster, int i, int i2, int i3, int i4, boolean z) {
        ArrayList arrayList = new ArrayList();
        for (int i5 = 0; i5 < i; i5++) {
            arrayList.add(CompletableFuture.supplyAsync(() -> {
                return Long.valueOf(runTestDataStream(cluster, i2, i3, i4, z));
            }, this.executor));
        }
        Assert.assertEquals(i, arrayList.size());
        return (Long) arrayList.stream().map((v0) -> {
            return v0.join();
        }).max((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElseThrow(IllegalStateException::new);
    }

    ClientId getPrimaryClientId(CLUSTER cluster, RaftPeer raftPeer) {
        return cluster.getDivision(raftPeer.getId()).getRaftClient().getId();
    }

    long runTestDataStream(CLUSTER cluster, int i, int i2, int i3, boolean z) {
        Iterable as = CollectionUtils.as(cluster.getServers(), raftServer -> {
            return raftServer;
        });
        RaftPeerId id = cluster.getLeader().getId();
        ArrayList arrayList = new ArrayList();
        RaftPeer raftPeer = (RaftPeer) CollectionUtils.random(cluster.getGroup().getPeers());
        try {
            RaftClient createClient = cluster.createClient(raftPeer);
            Throwable th = null;
            try {
                try {
                    ClientId primaryClientId = getPrimaryClientId(cluster, raftPeer);
                    for (int i4 = 0; i4 < i; i4++) {
                        DataStreamClientImpl.DataStreamOutputImpl stream = createClient.getDataStreamApi().stream((ByteBuffer) null, getRoutingTable(cluster.getGroup().getPeers(), raftPeer));
                        arrayList.add(CompletableFuture.supplyAsync(() -> {
                            return DataStreamTestUtils.writeAndCloseAndAssertReplies(as, id, stream, i2, i3, primaryClientId, createClient.getId(), z).join();
                        }, this.executor));
                    }
                    Assert.assertEquals(i, arrayList.size());
                    long longValue = ((Long) arrayList.stream().map((v0) -> {
                        return v0.join();
                    }).map((v0) -> {
                        return v0.getLogIndex();
                    }).max((v0, v1) -> {
                        return v0.compareTo(v1);
                    }).orElseThrow(IllegalStateException::new)).longValue();
                    if (createClient != null) {
                        if (0 != 0) {
                            try {
                                createClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createClient.close();
                        }
                    }
                    return longValue;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new CompletionException(e);
        }
    }
}
