package org.apache.omid.tso.client;

import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.util.concurrent.ExecutionException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.omid.TestUtils;
import org.apache.omid.tso.HALeaseManagementModule;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.VoidLeaseManagementModule;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.omid.tso.client.TSOClient;
import org.apache.statemachine.StateMachine;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/omid/tso/client/TestTSOClientConnectionToTSO.class */
public class TestTSOClientConnectionToTSO {
    private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientConnectionToTSO.class);
    private static final String TSO_HOST = "localhost";
    private static final String CURRENT_TSO_PATH = "/current_tso_path";
    private static final String TSO_LEASE_PATH = "/tso_lease_path";
    private int tsoPortForTest;
    private String zkClusterForTest;
    private Injector injector = null;
    private TestingServer zkServer;
    private CuratorFramework zkClient;
    private TSOServer tsoServer;

    @BeforeMethod
    public void beforeMethod() throws Exception {
        this.tsoPortForTest = TestUtils.getFreeLocalPort();
        int freeLocalPort = TestUtils.getFreeLocalPort();
        this.zkClusterForTest = "localhost:" + freeLocalPort;
        LOG.info("Starting ZK Server in port {}", Integer.valueOf(freeLocalPort));
        this.zkServer = TestUtils.provideTestingZKServer(freeLocalPort);
        LOG.info("ZK Server Started @ {}", this.zkServer.getConnectString());
        this.zkClient = TestUtils.provideConnectedZKClient(this.zkClusterForTest);
        try {
            this.zkClient.delete().forPath(CURRENT_TSO_PATH);
            Assert.assertNull((Stat) this.zkClient.checkExists().forPath(CURRENT_TSO_PATH), "/current_tso_path should not exist");
        } catch (KeeperException.NoNodeException e) {
            LOG.info("{} ZNode did not exist", CURRENT_TSO_PATH);
        }
    }

    @AfterMethod
    public void afterMethod() {
        this.zkClient.close();
        CloseableUtils.closeQuietly(this.zkServer);
        this.zkServer = null;
        LOG.info("ZK Server Stopped");
    }

    @Test(timeOut = 30000)
    public void testUnsuccessfulConnectionToTSO() throws Exception {
        try {
            TSOClient.newInstance(new OmidClientConfiguration());
        } catch (IllegalArgumentException e) {
        }
    }

    @Test(timeOut = 30000)
    public void testSuccessfulConnectionToTSOWithHostAndPort() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setConflictMapSize(1000);
        tSOServerConfig.setPort(this.tsoPortForTest);
        tSOServerConfig.setLeaseModule(new VoidLeaseManagementModule());
        this.injector = Guice.createInjector(new Module[]{new TSOMockModule(tSOServerConfig)});
        LOG.info("Starting TSO");
        this.tsoServer = (TSOServer) this.injector.getInstance(TSOServer.class);
        this.tsoServer.startAndWait();
        TestUtils.waitForSocketListening(TSO_HOST, this.tsoPortForTest, 100);
        LOG.info("Finished loading TSO");
        OmidClientConfiguration omidClientConfiguration = new OmidClientConfiguration();
        omidClientConfiguration.setConnectionString("localhost:" + this.tsoPortForTest);
        omidClientConfiguration.setZkCurrentTsoPath(CURRENT_TSO_PATH);
        TSOClient newInstance = TSOClient.newInstance(omidClientConfiguration);
        Long l = newInstance.getNewStartTimestamp().get();
        LOG.info("Start TS {} ", l);
        Assert.assertEquals(l.longValue(), 50L);
        newInstance.close().get();
        this.tsoServer.stopAndWait();
        this.tsoServer = null;
        TestUtils.waitForSocketNotListening(TSO_HOST, this.tsoPortForTest, 1000);
        LOG.info("TSO Server Stopped");
    }

    @Test(timeOut = 30000)
    public void testSuccessfulConnectionToTSOThroughZK() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setConflictMapSize(1000);
        tSOServerConfig.setPort(this.tsoPortForTest);
        tSOServerConfig.setLeaseModule(new HALeaseManagementModule(1000L, TSO_LEASE_PATH, CURRENT_TSO_PATH, this.zkClusterForTest, "omid"));
        this.injector = Guice.createInjector(new Module[]{new TSOMockModule(tSOServerConfig)});
        LOG.info("Starting TSO");
        this.tsoServer = (TSOServer) this.injector.getInstance(TSOServer.class);
        this.tsoServer.startAndWait();
        TestUtils.waitForSocketListening(TSO_HOST, this.tsoPortForTest, 100);
        LOG.info("Finished loading TSO");
        waitTillTsoRegisters((CuratorFramework) this.injector.getInstance(CuratorFramework.class));
        OmidClientConfiguration omidClientConfiguration = new OmidClientConfiguration();
        omidClientConfiguration.setConnectionType(OmidClientConfiguration.ConnType.HA);
        omidClientConfiguration.setConnectionString(this.zkClusterForTest);
        omidClientConfiguration.setZkCurrentTsoPath(CURRENT_TSO_PATH);
        TSOClient newInstance = TSOClient.newInstance(omidClientConfiguration);
        Long l = newInstance.getNewStartTimestamp().get();
        LOG.info("Start TS {} ", l);
        Assert.assertEquals(l.longValue(), 50L);
        newInstance.close().get();
        this.tsoServer.stopAndWait();
        this.tsoServer = null;
        TestUtils.waitForSocketNotListening(TSO_HOST, this.tsoPortForTest, 1000);
        LOG.info("TSO Server Stopped");
    }

    @Test(timeOut = 30000)
    public void testSuccessOfTSOClientReconnectionsToARestartedTSOWithZKPublishing() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setConflictMapSize(1000);
        tSOServerConfig.setPort(this.tsoPortForTest);
        tSOServerConfig.setLeaseModule(new HALeaseManagementModule(1000L, TSO_LEASE_PATH, CURRENT_TSO_PATH, this.zkClusterForTest, "omid"));
        this.injector = Guice.createInjector(new Module[]{new TSOMockModule(tSOServerConfig)});
        LOG.info("Starting Initial TSO");
        this.tsoServer = (TSOServer) this.injector.getInstance(TSOServer.class);
        this.tsoServer.startAndWait();
        TestUtils.waitForSocketListening(TSO_HOST, this.tsoPortForTest, 100);
        LOG.info("Finished loading TSO");
        waitTillTsoRegisters((CuratorFramework) this.injector.getInstance(CuratorFramework.class));
        OmidClientConfiguration omidClientConfiguration = new OmidClientConfiguration();
        omidClientConfiguration.setConnectionType(OmidClientConfiguration.ConnType.HA);
        omidClientConfiguration.setConnectionString(this.zkClusterForTest);
        omidClientConfiguration.setZkCurrentTsoPath(CURRENT_TSO_PATH);
        TSOClient newInstance = TSOClient.newInstance(omidClientConfiguration);
        Long l = newInstance.getNewStartTimestamp().get();
        LOG.info("Start TS {} ", l);
        Assert.assertEquals(l.longValue(), 50L);
        this.tsoServer.stopAndWait();
        this.tsoServer = null;
        TestUtils.waitForSocketNotListening(TSO_HOST, this.tsoPortForTest, 1000);
        LOG.info("Initial TSO Server Stopped");
        Thread.sleep(1500L);
        try {
            l = newInstance.getNewStartTimestamp().get();
            Assert.fail();
        } catch (ExecutionException e) {
            LOG.info("Exception expected");
            StateMachine.FsmImpl fsmImpl = (StateMachine.FsmImpl) newInstance.fsm;
            Assert.assertEquals(e.getCause().getClass(), ConnectionException.class);
            Assert.assertTrue(fsmImpl.getState().getClass().equals(TSOClient.ConnectionFailedState.class) || fsmImpl.getState().getClass().equals(TSOClient.DisconnectedState.class));
        }
        Injector createInjector = Guice.createInjector(new Module[]{new TSOMockModule(tSOServerConfig)});
        LOG.info("Re-Starting again the TSO");
        this.tsoServer = (TSOServer) createInjector.getInstance(TSOServer.class);
        this.tsoServer.startAndWait();
        TestUtils.waitForSocketListening(TSO_HOST, this.tsoPortForTest, 100);
        LOG.info("Finished loading restarted TSO");
        boolean z = false;
        while (!z) {
            try {
                l = newInstance.getNewStartTimestamp().get();
                z = true;
            } catch (ExecutionException e2) {
            }
        }
        Assert.assertNotNull(l);
        this.tsoServer.stopAndWait();
        TestUtils.waitForSocketNotListening(TSO_HOST, this.tsoPortForTest, 1000);
        LOG.info("Restarted TSO Server Stopped");
    }

    private void waitTillTsoRegisters(CuratorFramework curatorFramework) throws Exception {
        while (true) {
            try {
                Stat stat = (Stat) curatorFramework.checkExists().forPath(CURRENT_TSO_PATH);
                if (stat != null) {
                    LOG.info("TSO registered in HA with path {}={}", CURRENT_TSO_PATH, stat.toString());
                    if (stat.toString().length() != 0) {
                        return;
                    }
                }
            } catch (Exception e) {
                LOG.debug("TSO still has not registered yet, sleeping...", e);
                Thread.sleep(500L);
            }
        }
    }
}
