package org.apache.fluo.integration.impl;

import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.oracle.OracleClient;
import org.apache.fluo.core.oracle.OracleServer;
import org.apache.fluo.core.util.HostUtil;
import org.apache.fluo.core.util.PortUtils;
import org.apache.fluo.integration.ITBaseImpl;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/fluo/integration/impl/OracleIT.class */
public class OracleIT extends ITBaseImpl {
    private static final String THRIFT_SERVER_LOGGER_NAME = "org.apache.fluo.core.shaded.thrift.server";

    @Rule
    public Timeout globalTimeout = Timeout.seconds(getTestTimeout());
    private Level curLevel;

    /* loaded from: input_file:org/apache/fluo/integration/impl/OracleIT$TimestampFetcher.class */
    private static class TimestampFetcher implements Runnable {
        private int numToGet;
        private Environment env;
        private List<Long> output;
        private CountDownLatch cdl;

        TimestampFetcher(int i, Environment environment, List<Long> list, CountDownLatch countDownLatch) {
            this.numToGet = i;
            this.env = environment;
            this.output = list;
            this.cdl = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            OracleClient oracleClient = this.env.getSharedResources().getOracleClient();
            for (int i = 0; i < this.numToGet; i++) {
                try {
                    this.output.add(Long.valueOf(oracleClient.getStamp().getTxTimestamp()));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            this.cdl.countDown();
        }
    }

    @Test
    public void testRestart() throws Exception {
        OracleClient oracleClient = this.env.getSharedResources().getOracleClient();
        long txTimestamp = oracleClient.getStamp().getTxTimestamp();
        long txTimestamp2 = oracleClient.getStamp().getTxTimestamp();
        this.oserver.stop();
        this.oserver.start();
        long txTimestamp3 = oracleClient.getStamp().getTxTimestamp();
        long txTimestamp4 = oracleClient.getStamp().getTxTimestamp();
        Assert.assertTrue(txTimestamp + " " + txTimestamp2, txTimestamp < txTimestamp2);
        Assert.assertTrue(txTimestamp2 + " " + txTimestamp3, txTimestamp2 < txTimestamp3);
        Assert.assertTrue(txTimestamp3 + " " + txTimestamp4, txTimestamp3 < txTimestamp4);
    }

    @Before
    public void disableLogger() {
        this.curLevel = Logger.getLogger(THRIFT_SERVER_LOGGER_NAME).getLevel();
        Logger.getLogger(THRIFT_SERVER_LOGGER_NAME).setLevel(Level.FATAL);
    }

    @After
    public void enableLogger() {
        Logger.getLogger(THRIFT_SERVER_LOGGER_NAME).setLevel(this.curLevel);
    }

    @Test
    public void bogusDataTest() throws Exception {
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress(HostUtil.getHostName(), this.oserver.getPort()));
        PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
        try {
            printWriter.print("abcd");
            printWriter.flush();
            printWriter.close();
            socket.close();
            Assert.assertEquals(2L, this.env.getSharedResources().getOracleClient().getStamp().getTxTimestamp());
        } catch (Throwable th) {
            try {
                printWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void threadTest() throws Exception {
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        for (int i = 0; i < 20; i++) {
            newFixedThreadPool.execute(new TimestampFetcher(100, this.env, synchronizedList, countDownLatch));
        }
        countDownLatch.await();
        TreeSet treeSet = new TreeSet(synchronizedList);
        Assert.assertEquals(20 * 100, treeSet.size());
        CountDownLatch countDownLatch2 = new CountDownLatch(20);
        synchronizedList.clear();
        for (int i2 = 0; i2 < 20; i2++) {
            newFixedThreadPool.execute(new TimestampFetcher(100, this.env, synchronizedList, countDownLatch2));
        }
        countDownLatch2.await();
        TreeSet treeSet2 = new TreeSet(synchronizedList);
        Assert.assertEquals(20 * 100, treeSet2.size());
        Assert.assertTrue(((Long) treeSet.last()).longValue() < ((Long) treeSet2.first()).longValue());
        newFixedThreadPool.shutdown();
    }

    @Test
    public void failover_newTimestampRequested() throws Exception {
        OracleServer oracleServer = this.oserver;
        Objects.requireNonNull(oracleServer);
        sleepUntil(oracleServer::isConnected);
        int randomFreePort = PortUtils.getRandomFreePort();
        int randomFreePort2 = PortUtils.getRandomFreePort();
        ITBaseImpl.TestOracle createExtraOracle = createExtraOracle(randomFreePort);
        ITBaseImpl.TestOracle createExtraOracle2 = createExtraOracle(randomFreePort2);
        createExtraOracle.start();
        Objects.requireNonNull(createExtraOracle);
        sleepUntil(createExtraOracle::isConnected);
        createExtraOracle2.start();
        Objects.requireNonNull(createExtraOracle2);
        sleepUntil(createExtraOracle2::isConnected);
        OracleClient oracleClient = this.env.getSharedResources().getOracleClient();
        long j = 2;
        while (true) {
            long j2 = j;
            if (j2 > 7) {
                Assert.assertTrue(oracleClient.getOracle().endsWith(Integer.toString(this.oserver.getPort())));
                this.oserver.stop();
                OracleServer oracleServer2 = this.oserver;
                Objects.requireNonNull(oracleServer2);
                sleepWhile(oracleServer2::isConnected);
                Objects.requireNonNull(createExtraOracle);
                sleepUntil(createExtraOracle::isLeader);
                Assert.assertEquals(1002L, oracleClient.getStamp().getTxTimestamp());
                Assert.assertTrue(oracleClient.getOracle().endsWith(Integer.toString(randomFreePort)));
                createExtraOracle.stop();
                Objects.requireNonNull(createExtraOracle);
                sleepWhile(createExtraOracle::isConnected);
                createExtraOracle.close();
                Objects.requireNonNull(createExtraOracle2);
                sleepUntil(createExtraOracle2::isLeader);
                Assert.assertEquals(2002L, oracleClient.getStamp().getTxTimestamp());
                Assert.assertTrue(oracleClient.getOracle().endsWith(Integer.toString(randomFreePort2)));
                createExtraOracle2.stop();
                createExtraOracle2.close();
                return;
            }
            Assert.assertEquals(j2, oracleClient.getStamp().getTxTimestamp());
            j = j2 + 1;
        }
    }

    @Test
    public void singleOracle_goesAwayAndComesBack() throws Exception {
        OracleServer oracleServer = this.oserver;
        Objects.requireNonNull(oracleServer);
        sleepUntil(oracleServer::isConnected);
        OracleClient oracleClient = this.env.getSharedResources().getOracleClient();
        long j = 2;
        while (true) {
            long j2 = j;
            if (j2 > 7) {
                break;
            }
            Assert.assertEquals(j2, oracleClient.getStamp().getTxTimestamp());
            j = j2 + 1;
        }
        this.oserver.stop();
        OracleServer oracleServer2 = this.oserver;
        Objects.requireNonNull(oracleServer2);
        sleepWhile(oracleServer2::isConnected);
        while (oracleClient.getOracle() != null) {
            Thread.sleep(100L);
        }
        Assert.assertNull(oracleClient.getOracle());
        this.oserver.start();
        OracleServer oracleServer3 = this.oserver;
        Objects.requireNonNull(oracleServer3);
        sleepUntil(oracleServer3::isConnected);
        Assert.assertEquals(1002L, oracleClient.getStamp().getTxTimestamp());
        Assert.assertTrue(oracleClient.getOracle().endsWith(Integer.toString(this.oserver.getPort())));
        this.oserver.stop();
    }

    @Test
    public void threadFailoverTest() throws Exception {
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(20);
        CountDownLatch countDownLatch = new CountDownLatch(20);
        int randomFreePort = PortUtils.getRandomFreePort();
        int randomFreePort2 = PortUtils.getRandomFreePort();
        ITBaseImpl.TestOracle createExtraOracle = createExtraOracle(randomFreePort);
        createExtraOracle.start();
        Objects.requireNonNull(createExtraOracle);
        sleepUntil(createExtraOracle::isConnected);
        ITBaseImpl.TestOracle createExtraOracle2 = createExtraOracle(randomFreePort2);
        createExtraOracle2.start();
        Objects.requireNonNull(createExtraOracle2);
        sleepUntil(createExtraOracle2::isConnected);
        for (int i = 0; i < 20; i++) {
            newFixedThreadPool.execute(new TimestampFetcher(100, this.env, synchronizedList, countDownLatch));
            if (i == 10) {
                this.oserver.stop();
            }
        }
        countDownLatch.await();
        TreeSet treeSet = new TreeSet(synchronizedList);
        Assert.assertEquals(20 * 100, treeSet.size());
        CountDownLatch countDownLatch2 = new CountDownLatch(20);
        synchronizedList.clear();
        for (int i2 = 0; i2 < 20; i2++) {
            newFixedThreadPool.execute(new TimestampFetcher(100, this.env, synchronizedList, countDownLatch2));
            if (i2 == 5) {
                createExtraOracle.stop();
            }
        }
        createExtraOracle.close();
        countDownLatch2.await();
        TreeSet treeSet2 = new TreeSet(synchronizedList);
        Assert.assertEquals(20 * 100, treeSet2.size());
        Assert.assertTrue(((Long) treeSet.last()).longValue() < ((Long) treeSet2.first()).longValue());
        newFixedThreadPool.shutdown();
        createExtraOracle2.stop();
        createExtraOracle2.close();
    }

    private void sleepUntil(Supplier<Boolean> supplier) throws InterruptedException {
        sleepWhile(() -> {
            return Boolean.valueOf(!((Boolean) supplier.get()).booleanValue());
        });
    }

    private void sleepWhile(Supplier<Boolean> supplier) throws InterruptedException {
        while (supplier.get().booleanValue()) {
            Thread.sleep(100L);
        }
    }
}
