package org.apache.iotdb.consensus.multileader;

import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.class */
public class MultiLeaderConsensusTest {
    private final Logger logger = LoggerFactory.getLogger(MultiLeaderConsensusTest.class);
    private final ConsensusGroupId gid = new DataRegionId(1);
    private final List<Peer> peers = Arrays.asList(new Peer(this.gid, new TEndPoint("127.0.0.1", 6000)), new Peer(this.gid, new TEndPoint("127.0.0.1", 6001)), new Peer(this.gid, new TEndPoint("127.0.0.1", 6002)));
    private final List<File> peersStorage = Arrays.asList(new File("target" + File.separator + "1"), new File("target" + File.separator + "2"), new File("target" + File.separator + "3"));
    private final ConsensusGroup group = new ConsensusGroup(this.gid, this.peers);
    private final List<MultiLeaderConsensus> servers = new ArrayList();
    private final List<TestStateMachine> stateMachines = new ArrayList();

    /* loaded from: input_file:org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest$FakeConsensusReqReader.class */
    public static class FakeConsensusReqReader implements ConsensusReqReader, DataSet {
        private final Set<IndexedConsensusRequest> requestSet;

        public FakeConsensusReqReader(Set<IndexedConsensusRequest> set) {
            this.requestSet = set;
        }

        public IConsensusRequest getReq(long j) {
            synchronized (this.requestSet) {
                for (IndexedConsensusRequest indexedConsensusRequest : this.requestSet) {
                    if (indexedConsensusRequest.getSearchIndex() == j) {
                        return indexedConsensusRequest;
                    }
                }
                return null;
            }
        }

        public List<IConsensusRequest> getReqs(long j, int i) {
            return null;
        }

        public ConsensusReqReader.ReqIterator getReqIterator(long j) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest$TestEntry.class */
    public static class TestEntry implements IConsensusRequest {
        private final int num;
        private final Peer peer;

        public TestEntry(int i, Peer peer) {
            this.num = i;
            this.peer = peer;
        }

        public ByteBuffer serializeToByteBuffer() {
            try {
                PublicBAOS publicBAOS = new PublicBAOS();
                Throwable th = null;
                try {
                    DataOutputStream dataOutputStream = new DataOutputStream(publicBAOS);
                    Throwable th2 = null;
                    try {
                        try {
                            dataOutputStream.writeInt(this.num);
                            this.peer.serialize(dataOutputStream);
                            ByteBuffer wrap = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
                            if (dataOutputStream != null) {
                                if (0 != 0) {
                                    try {
                                        dataOutputStream.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    dataOutputStream.close();
                                }
                            }
                            return wrap;
                        } finally {
                        }
                    } catch (Throwable th4) {
                        if (dataOutputStream != null) {
                            if (th2 != null) {
                                try {
                                    dataOutputStream.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                dataOutputStream.close();
                            }
                        }
                        throw th4;
                    }
                } finally {
                    if (publicBAOS != null) {
                        if (0 != 0) {
                            try {
                                publicBAOS.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            publicBAOS.close();
                        }
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TestEntry testEntry = (TestEntry) obj;
            return this.num == testEntry.num && Objects.equals(this.peer, testEntry.peer);
        }

        public int hashCode() {
            return Objects.hash(Integer.valueOf(this.num), this.peer);
        }

        public String toString() {
            return "TestEntry{num=" + this.num + ", peer=" + this.peer + '}';
        }
    }

    /* loaded from: input_file:org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest$TestStateMachine.class */
    private static class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
        private final Set<IndexedConsensusRequest> requestSet;

        private TestStateMachine() {
            this.requestSet = ConcurrentHashMap.newKeySet();
        }

        public Set<IndexedConsensusRequest> getRequestSet() {
            return this.requestSet;
        }

        public Set<TestEntry> getData() {
            HashSet hashSet = new HashSet();
            this.requestSet.forEach(indexedConsensusRequest -> {
                hashSet.add((TestEntry) indexedConsensusRequest.getRequest());
            });
            return hashSet;
        }

        public void start() {
        }

        public void stop() {
        }

        public TSStatus write(IConsensusRequest iConsensusRequest) {
            TSStatus tSStatus;
            synchronized (this.requestSet) {
                IConsensusRequest request = ((IndexedConsensusRequest) iConsensusRequest).getRequest();
                if (request instanceof ByteBufferConsensusRequest) {
                    ByteBuffer serializeToByteBuffer = request.serializeToByteBuffer();
                    this.requestSet.add(new IndexedConsensusRequest(((IndexedConsensusRequest) iConsensusRequest).getSearchIndex(), -1L, new TestEntry(serializeToByteBuffer.getInt(), Peer.deserialize(serializeToByteBuffer))));
                } else {
                    this.requestSet.add((IndexedConsensusRequest) iConsensusRequest);
                }
                tSStatus = new TSStatus();
            }
            return tSStatus;
        }

        public synchronized DataSet read(IConsensusRequest iConsensusRequest) {
            if (iConsensusRequest instanceof GetConsensusReqReaderPlan) {
                return new FakeConsensusReqReader(this.requestSet);
            }
            return null;
        }

        public boolean takeSnapshot(File file) {
            return false;
        }

        public void loadSnapshot(File file) {
        }
    }

    @Before
    public void setUp() throws Exception {
        for (int i = 0; i < 3; i++) {
            this.peersStorage.get(i).mkdirs();
            this.stateMachines.add(new TestStateMachine());
        }
        initServer();
    }

    @After
    public void tearDown() throws Exception {
        stopServer();
        Iterator<File> it = this.peersStorage.iterator();
        while (it.hasNext()) {
            FileUtils.deleteFully(it.next());
        }
    }

    private void initServer() throws IOException {
        for (int i = 0; i < 3; i++) {
            int i2 = i;
            this.servers.add((MultiLeaderConsensus) ConsensusFactory.getConsensusImpl("org.apache.iotdb.consensus.multileader.MultiLeaderConsensus", ConsensusConfig.newBuilder().setThisNode(this.peers.get(i).getEndpoint()).setStorageDir(this.peersStorage.get(i).getAbsolutePath()).build(), consensusGroupId -> {
                return this.stateMachines.get(i2);
            }).orElseThrow(() -> {
                return new IllegalArgumentException(String.format("Construct consensusImpl failed, Please check your consensus className %s", "org.apache.iotdb.consensus.multileader.MultiLeaderConsensus"));
            }));
            this.servers.get(i).start();
        }
    }

    private void stopServer() {
        this.servers.parallelStream().forEach((v0) -> {
            v0.stop();
        });
        this.servers.clear();
    }

    @Test
    public void ReplicateUsingQueueTest() throws IOException, InterruptedException {
        this.logger.info("Start ReplicateUsingQueueTest");
        this.servers.get(0).addConsensusGroup(this.group.getGroupId(), this.group.getPeers());
        this.servers.get(1).addConsensusGroup(this.group.getGroupId(), this.group.getPeers());
        this.servers.get(2).addConsensusGroup(this.group.getGroupId(), this.group.getPeers());
        Assert.assertEquals(0L, this.servers.get(0).getImpl(this.gid).getController().getCurrentIndex());
        Assert.assertEquals(0L, this.servers.get(1).getImpl(this.gid).getController().getCurrentIndex());
        Assert.assertEquals(0L, this.servers.get(2).getImpl(this.gid).getController().getCurrentIndex());
        for (int i = 0; i < 500; i++) {
            this.servers.get(0).write(this.gid, new TestEntry(i, this.peers.get(0)));
            this.servers.get(1).write(this.gid, new TestEntry(i, this.peers.get(1)));
            this.servers.get(2).write(this.gid, new TestEntry(i, this.peers.get(2)));
            Assert.assertEquals(i + 1, this.servers.get(0).getImpl(this.gid).getController().getCurrentIndex());
            Assert.assertEquals(i + 1, this.servers.get(1).getImpl(this.gid).getController().getCurrentIndex());
            Assert.assertEquals(i + 1, this.servers.get(2).getImpl(this.gid).getController().getCurrentIndex());
        }
        Assert.assertEquals(500L, this.servers.get(0).getImpl(this.gid).getController().getLastFlushedIndex());
        Assert.assertEquals(500L, this.servers.get(1).getImpl(this.gid).getController().getLastFlushedIndex());
        Assert.assertEquals(500L, this.servers.get(2).getImpl(this.gid).getController().getLastFlushedIndex());
        for (int i2 = 0; i2 < 3; i2++) {
            long currentTimeMillis = System.currentTimeMillis();
            while (this.servers.get(i2).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex() < 500) {
                if (System.currentTimeMillis() - currentTimeMillis > 20000) {
                    Assert.fail("Unable to replicate entries");
                }
                Thread.sleep(100L);
            }
        }
        Assert.assertEquals(500L, this.servers.get(0).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex());
        Assert.assertEquals(500L, this.servers.get(1).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex());
        Assert.assertEquals(500L, this.servers.get(2).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex());
        Assert.assertEquals(1500L, this.stateMachines.get(0).getRequestSet().size());
        Assert.assertEquals(1500L, this.stateMachines.get(1).getRequestSet().size());
        Assert.assertEquals(1500L, this.stateMachines.get(2).getRequestSet().size());
        Assert.assertEquals(this.stateMachines.get(0).getData(), this.stateMachines.get(1).getData());
        Assert.assertEquals(this.stateMachines.get(2).getData(), this.stateMachines.get(1).getData());
        stopServer();
        initServer();
        Assert.assertEquals(this.peers, this.servers.get(0).getImpl(this.gid).getConfiguration());
        Assert.assertEquals(this.peers, this.servers.get(1).getImpl(this.gid).getConfiguration());
        Assert.assertEquals(this.peers, this.servers.get(2).getImpl(this.gid).getConfiguration());
        Assert.assertEquals(1000L, this.servers.get(0).getImpl(this.gid).getController().getCurrentIndex());
        Assert.assertEquals(1000L, this.servers.get(1).getImpl(this.gid).getController().getCurrentIndex());
        Assert.assertEquals(1000L, this.servers.get(2).getImpl(this.gid).getController().getCurrentIndex());
        for (int i3 = 0; i3 < 3; i3++) {
            long currentTimeMillis2 = System.currentTimeMillis();
            while (this.servers.get(i3).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex() < 500) {
                if (System.currentTimeMillis() - currentTimeMillis2 > 20000) {
                    Assert.fail("Unable to recover entries");
                }
                Thread.sleep(100L);
            }
        }
        Assert.assertEquals(500L, this.servers.get(1).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex());
        Assert.assertEquals(500L, this.servers.get(1).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex());
        Assert.assertEquals(500L, this.servers.get(2).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex());
    }

    @Test
    public void ReplicateUsingWALTest() throws IOException, InterruptedException {
        this.logger.info("Start ReplicateUsingWALTest");
        this.servers.get(0).addConsensusGroup(this.group.getGroupId(), this.group.getPeers());
        this.servers.get(1).addConsensusGroup(this.group.getGroupId(), this.group.getPeers());
        Assert.assertEquals(0L, this.servers.get(0).getImpl(this.gid).getController().getCurrentIndex());
        Assert.assertEquals(0L, this.servers.get(1).getImpl(this.gid).getController().getCurrentIndex());
        for (int i = 0; i < 500; i++) {
            this.servers.get(0).write(this.gid, new TestEntry(i, this.peers.get(0)));
            this.servers.get(1).write(this.gid, new TestEntry(i, this.peers.get(1)));
            Assert.assertEquals(i + 1, this.servers.get(0).getImpl(this.gid).getController().getCurrentIndex());
            Assert.assertEquals(i + 1, this.servers.get(1).getImpl(this.gid).getController().getCurrentIndex());
        }
        Assert.assertEquals(500L, this.servers.get(0).getImpl(this.gid).getController().getLastFlushedIndex());
        Assert.assertEquals(500L, this.servers.get(1).getImpl(this.gid).getController().getLastFlushedIndex());
        Assert.assertEquals(0L, this.servers.get(0).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex());
        Assert.assertEquals(0L, this.servers.get(1).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex());
        stopServer();
        initServer();
        this.servers.get(2).addConsensusGroup(this.group.getGroupId(), this.group.getPeers());
        Assert.assertEquals(this.peers, this.servers.get(0).getImpl(this.gid).getConfiguration());
        Assert.assertEquals(this.peers, this.servers.get(1).getImpl(this.gid).getConfiguration());
        Assert.assertEquals(this.peers, this.servers.get(2).getImpl(this.gid).getConfiguration());
        Assert.assertEquals(1000L, this.servers.get(0).getImpl(this.gid).getController().getCurrentIndex());
        Assert.assertEquals(1000L, this.servers.get(1).getImpl(this.gid).getController().getCurrentIndex());
        Assert.assertEquals(0L, this.servers.get(2).getImpl(this.gid).getController().getCurrentIndex());
        for (int i2 = 0; i2 < 2; i2++) {
            long currentTimeMillis = System.currentTimeMillis();
            while (this.servers.get(i2).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex() < 500) {
                if (System.currentTimeMillis() - currentTimeMillis > 20000) {
                    this.logger.error("{}", Long.valueOf(this.servers.get(i2).getImpl(this.gid).getCurrentSafelyDeletedSearchIndex()));
                    Assert.fail("Unable to replicate entries");
                }
                Thread.sleep(100L);
            }
        }
        Assert.assertEquals(1000L, this.stateMachines.get(0).getRequestSet().size());
        Assert.assertEquals(1000L, this.stateMachines.get(1).getRequestSet().size());
        Assert.assertEquals(1000L, this.stateMachines.get(2).getRequestSet().size());
        Assert.assertEquals(this.stateMachines.get(0).getData(), this.stateMachines.get(1).getData());
        Assert.assertEquals(this.stateMachines.get(2).getData(), this.stateMachines.get(1).getData());
    }
}
