package org.apache.omid.tso.client;

import com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.ibm.icu.text.BreakIterator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.tso.PausableTimestampOracle;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.TimestampOracle;
import org.apache.omid.tso.util.DummyCellIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/omid/tso/client/TestTSOClientRequestAndResponseBehaviours.class */
public class TestTSOClientRequestAndResponseBehaviours {
    private static final String TSO_SERVER_HOST = "localhost";
    private static final int TSO_SERVER_PORT = 1234;
    private OmidClientConfiguration tsoClientConf;
    private TSOServer tsoServer;
    private PausableTimestampOracle pausableTSOracle;
    private CommitTable commitTable;
    private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRequestAndResponseBehaviours.class);
    private static final CellId c1 = new DummyCellIdImpl(3735928559L);
    private static final CellId c2 = new DummyCellIdImpl(4276996862L);
    private static final Set<CellId> testWriteSet = Sets.newHashSet(new CellId[]{c1, c2});

    @BeforeMethod
    public void beforeMethod() throws Exception {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setConflictMapSize(1000);
        tSOServerConfig.setPort(TSO_SERVER_PORT);
        tSOServerConfig.setNumConcurrentCTWriters(2);
        Injector createInjector = Guice.createInjector(new Module[]{new TSOMockModule(tSOServerConfig)});
        LOG.info("==================================================================================================");
        LOG.info("======================================= Init TSO Server ==========================================");
        LOG.info("==================================================================================================");
        this.tsoServer = (TSOServer) createInjector.getInstance(TSOServer.class);
        this.tsoServer.startAndWait();
        TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
        LOG.info("==================================================================================================");
        LOG.info("===================================== TSO Server Initialized =====================================");
        LOG.info("==================================================================================================");
        this.pausableTSOracle = (PausableTimestampOracle) createInjector.getInstance(TimestampOracle.class);
        this.commitTable = (CommitTable) createInjector.getInstance(CommitTable.class);
        OmidClientConfiguration omidClientConfiguration = new OmidClientConfiguration();
        omidClientConfiguration.setConnectionString("localhost:1234");
        this.tsoClientConf = omidClientConfiguration;
    }

    @AfterMethod
    public void afterMethod() throws Exception {
        this.tsoServer.stopAndWait();
        this.tsoServer = null;
        TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
        this.pausableTSOracle.resume();
    }

    @Test(timeOut = 30000)
    public void testTimeoutsAreCancelled() throws Exception {
        TSOClient newInstance = TSOClient.newInstance(this.tsoClientConf);
        LOG.info("Request timeout {} ms; Max retries {}", Integer.valueOf(BreakIterator.WORD_IDEO_LIMIT), 5);
        TSOFuture<Long> tSOFuture = null;
        for (int i = 0; i < 5 * 10; i++) {
            tSOFuture = newInstance.getNewStartTimestamp();
        }
        if (tSOFuture != null) {
            tSOFuture.get();
        }
        this.pausableTSOracle.pause();
        long j = (long) (BreakIterator.WORD_IDEO_LIMIT * 0.75d);
        LOG.info("Sleeping for {} ms", Long.valueOf(j));
        TimeUnit.MILLISECONDS.sleep(j);
        TSOFuture<Long> newStartTimestamp = newInstance.getNewStartTimestamp();
        long j2 = (long) (BreakIterator.WORD_IDEO_LIMIT * 0.9d);
        LOG.info("Sleeping for {} ms", Long.valueOf(j2));
        TimeUnit.MILLISECONDS.sleep(j2);
        LOG.info("Resuming");
        this.pausableTSOracle.resume();
        newStartTimestamp.get();
    }

    @Test(timeOut = 30000)
    public void testCommitGetsServiceUnavailableExceptionWhenCommunicationFails() throws Exception {
        OmidClientConfiguration omidClientConfiguration = new OmidClientConfiguration();
        omidClientConfiguration.setConnectionString("localhost:1234");
        omidClientConfiguration.setRequestMaxRetries(0);
        TSOClient newInstance = TSOClient.newInstance(omidClientConfiguration);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(newInstance.getNewStartTimestamp().get());
        }
        this.pausableTSOracle.pause();
        ArrayList arrayList2 = new ArrayList();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            arrayList2.add(newInstance.commit(((Long) it2.next()).longValue(), Sets.newHashSet()));
        }
        TSOClientAccessor.closeChannel(newInstance);
        Iterator it3 = arrayList2.iterator();
        while (it3.hasNext()) {
            try {
                ((Future) it3.next()).get();
                Assert.fail("Shouldn't be able to complete");
            } catch (ExecutionException e) {
                Assert.assertTrue(e.getCause() instanceof ServiceUnavailableException, "Should be a service unavailable exception");
            }
        }
    }

    @Test(timeOut = 30000)
    public void testHandshakeBetweenOldClientAndCurrentServer() throws Exception {
        TSOClientRaw tSOClientRaw = new TSOClientRaw(TSO_SERVER_HOST, TSO_SERVER_PORT);
        tSOClientRaw.write(TSOProto.Request.newBuilder().setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build()).build());
        try {
            tSOClientRaw.getResponse().get();
            Assert.fail("Channel should be closed");
        } catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getClass(), ConnectionException.class, "Should be channel closed exception");
        }
        tSOClientRaw.close();
    }

    @Test(timeOut = 30000)
    public void testOutOfOrderMessages() throws Exception {
        TSOClient newInstance = TSOClient.newInstance(this.tsoClientConf);
        TSOClientOneShot tSOClientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        TSOProto.Response makeRequest = tSOClientOneShot.makeRequest(createCommitRequest(longValue, true, testWriteSet));
        TSOProto.Response makeRequest2 = tSOClientOneShot.makeRequest(createCommitRequest(longValue, false, testWriteSet));
        Assert.assertFalse(makeRequest.getCommitResponse().getAborted(), "Retry Transaction should commit");
        Assert.assertTrue(makeRequest2.getCommitResponse().getAborted(), "Transaction should abort");
    }

    @Test(timeOut = 30000)
    public void testDuplicateCommitAborting() throws Exception {
        TSOClient newInstance = TSOClient.newInstance(this.tsoClientConf);
        TSOClientOneShot tSOClientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        newInstance.commit(newInstance.getNewStartTimestamp().get().longValue(), testWriteSet).get();
        TSOProto.Response makeRequest = tSOClientOneShot.makeRequest(createCommitRequest(longValue, false, testWriteSet));
        TSOProto.Response makeRequest2 = tSOClientOneShot.makeRequest(createCommitRequest(longValue, true, testWriteSet));
        Assert.assertTrue(makeRequest.getCommitResponse().getAborted(), "Transaction should abort");
        Assert.assertTrue(makeRequest2.getCommitResponse().getAborted(), "Retry commit should abort");
    }

    @Test(timeOut = 30000)
    public void testDuplicateCommit() throws Exception {
        TSOClient newInstance = TSOClient.newInstance(this.tsoClientConf);
        TSOClientOneShot tSOClientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        TSOProto.Response makeRequest = tSOClientOneShot.makeRequest(createCommitRequest(longValue, false, testWriteSet));
        TSOProto.Response makeRequest2 = tSOClientOneShot.makeRequest(createCommitRequest(longValue, true, testWriteSet));
        if (!newInstance.isLowLatency()) {
            Assert.assertEquals(makeRequest2.getCommitResponse().getCommitTimestamp(), makeRequest.getCommitResponse().getCommitTimestamp(), "Commit timestamp should be the same");
        } else {
            Assert.assertTrue(makeRequest.hasCommitResponse());
            Assert.assertTrue(makeRequest2.getCommitResponse().getAborted());
        }
    }

    @Test(timeOut = 30000)
    public void testCommitCanSucceedWhenChannelDisconnected() throws Exception {
        TSOClient newInstance = TSOClient.newInstance(this.tsoClientConf);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        if (newInstance.isLowLatency()) {
            return;
        }
        this.pausableTSOracle.pause();
        TSOFuture<Long> commit = newInstance.commit(longValue, testWriteSet);
        TSOClientAccessor.closeChannel(newInstance);
        this.pausableTSOracle.resume();
        commit.get();
    }

    @Test(timeOut = 30000)
    public void testCommitCanSucceedWithMultipleTimeouts() throws Exception {
        OmidClientConfiguration omidClientConfiguration = new OmidClientConfiguration();
        omidClientConfiguration.setConnectionString("localhost:1234");
        omidClientConfiguration.setRequestTimeoutInMs(100);
        omidClientConfiguration.setRequestMaxRetries(10000);
        TSOClient newInstance = TSOClient.newInstance(omidClientConfiguration);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        this.pausableTSOracle.pause();
        TSOFuture<Long> commit = newInstance.commit(longValue, testWriteSet);
        TimeUnit.SECONDS.sleep(1L);
        this.pausableTSOracle.resume();
        commit.get();
    }

    @Test(timeOut = 30000)
    public void testCommitFailWhenTSOIsDown() throws Exception {
        OmidClientConfiguration omidClientConfiguration = new OmidClientConfiguration();
        omidClientConfiguration.setConnectionString("localhost:1234");
        omidClientConfiguration.setRequestTimeoutInMs(100);
        omidClientConfiguration.setRequestMaxRetries(10);
        TSOClient newInstance = TSOClient.newInstance(omidClientConfiguration);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        this.pausableTSOracle.pause();
        try {
            newInstance.commit(longValue, testWriteSet).get();
        } catch (ExecutionException e) {
            Assert.assertEquals(e.getCause().getClass(), ServiceUnavailableException.class, "Should be a ServiceUnavailableExeption");
        }
    }

    @Test(timeOut = 30000)
    public void testTimestampRequestSucceedWithMultipleTimeouts() throws Exception {
        OmidClientConfiguration omidClientConfiguration = new OmidClientConfiguration();
        omidClientConfiguration.setConnectionString("localhost:1234");
        omidClientConfiguration.setRequestTimeoutInMs(100);
        omidClientConfiguration.setRequestMaxRetries(10000);
        TSOClient newInstance = TSOClient.newInstance(omidClientConfiguration);
        this.pausableTSOracle.pause();
        TSOFuture<Long> newStartTimestamp = newInstance.getNewStartTimestamp();
        TimeUnit.SECONDS.sleep(1L);
        this.pausableTSOracle.resume();
        newStartTimestamp.get();
    }

    @Test(timeOut = 30000)
    public void testCommitTimestampPresentInCommitTableReturnsCommit() throws Exception {
        TSOClient newInstance = TSOClient.newInstance(this.tsoClientConf);
        TSOClientOneShot tSOClientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        tSOClientOneShot.makeRequest(createRetryCommitRequest(longValue));
        TSOProto.Response makeRequest = tSOClientOneShot.makeRequest(createRetryCommitRequest(longValue));
        if (newInstance.isLowLatency()) {
            Assert.assertTrue(makeRequest.getCommitResponse().getAborted(), "Transaction should be aborted");
        } else {
            Assert.assertFalse(makeRequest.getCommitResponse().getAborted(), "Transaction should be committed");
            Assert.assertEquals(makeRequest.getCommitResponse().getCommitTimestamp(), longValue + 50);
        }
    }

    @Test(timeOut = 30000)
    public void testInvalidCommitTimestampPresentInCommitTableReturnsAbort() throws Exception {
        TSOClient newInstance = TSOClient.newInstance(this.tsoClientConf);
        TSOClientOneShot tSOClientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        this.commitTable.getClient().tryInvalidateTransaction(longValue);
        tSOClientOneShot.makeRequest(createRetryCommitRequest(longValue));
        TSOProto.Response makeRequest = tSOClientOneShot.makeRequest(createRetryCommitRequest(longValue));
        Assert.assertTrue(makeRequest.getCommitResponse().getAborted(), "Transaction should be aborted");
        Assert.assertEquals(makeRequest.getCommitResponse().getCommitTimestamp(), 0L);
    }

    @Test(timeOut = 30000)
    public void testCommitTimestampNotPresentInCommitTableReturnsAnAbort() throws Exception {
        TSOClient newInstance = TSOClient.newInstance(this.tsoClientConf);
        TSOClientOneShot tSOClientOneShot = new TSOClientOneShot(TSO_SERVER_HOST, TSO_SERVER_PORT);
        long longValue = newInstance.getNewStartTimestamp().get().longValue();
        tSOClientOneShot.makeRequest(createRetryCommitRequest(longValue));
        this.commitTable.getClient().deleteCommitEntry(longValue);
        TSOProto.Response makeRequest = tSOClientOneShot.makeRequest(createRetryCommitRequest(longValue));
        Assert.assertTrue(makeRequest.getCommitResponse().getAborted(), "Transaction should abort");
        Assert.assertEquals(makeRequest.getCommitResponse().getCommitTimestamp(), 0L);
    }

    private TSOProto.Request createRetryCommitRequest(long j) {
        return createCommitRequest(j, true, testWriteSet);
    }

    private TSOProto.Request createCommitRequest(long j, boolean z, Set<CellId> set) {
        TSOProto.Request.Builder newBuilder = TSOProto.Request.newBuilder();
        TSOProto.CommitRequest.Builder newBuilder2 = TSOProto.CommitRequest.newBuilder();
        newBuilder2.setStartTimestamp(j);
        newBuilder2.setIsRetry(z);
        Iterator<CellId> it2 = set.iterator();
        while (it2.hasNext()) {
            newBuilder2.addCellId(it2.next().getCellId());
        }
        return newBuilder.setCommitRequest(newBuilder2.m1901build()).build();
    }
}
