package co.cask.tephra.distributed;

import co.cask.tephra.ThriftTransactionSystemTest;
import co.cask.tephra.TransactionSystemClient;
import co.cask.tephra.persist.InMemoryTransactionStateStorage;
import co.cask.tephra.persist.TransactionEdit;
import co.cask.tephra.persist.TransactionLog;
import co.cask.tephra.persist.TransactionStateStorage;
import co.cask.tephra.runtime.ConfigModule;
import co.cask.tephra.runtime.DiscoveryModules;
import co.cask.tephra.runtime.TransactionClientModule;
import co.cask.tephra.runtime.TransactionModules;
import co.cask.tephra.runtime.ZKModule;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.util.Modules;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/tephra/distributed/ThriftTransactionServerTest.class */
public class ThriftTransactionServerTest {
    private static InMemoryZKServer zkServer;
    private static ZKClientService zkClientService;
    private static TransactionService txService;
    private static TransactionStateStorage storage;
    static Injector injector;
    private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
    private static final CountDownLatch STORAGE_WAIT_LATCH = new CountDownLatch(1);
    private static final int NUM_CLIENTS = 17;
    private static final CountDownLatch CLIENTS_DONE_LATCH = new CountDownLatch(NUM_CLIENTS);

    @ClassRule
    public static TemporaryFolder tmpFolder = new TemporaryFolder();

    /* loaded from: input_file:co/cask/tephra/distributed/ThriftTransactionServerTest$SlowTransactionLog.class */
    private static class SlowTransactionLog extends InMemoryTransactionStateStorage.InMemoryTransactionLog {
        public SlowTransactionLog(long j) {
            super(j);
        }

        @Override // co.cask.tephra.persist.InMemoryTransactionStateStorage.InMemoryTransactionLog
        public void append(TransactionEdit transactionEdit) throws IOException {
            try {
                ThriftTransactionServerTest.STORAGE_WAIT_LATCH.await();
            } catch (InterruptedException e) {
                ThriftTransactionServerTest.LOG.error("Got exception: ", e);
            }
            super.append(transactionEdit);
        }

        @Override // co.cask.tephra.persist.InMemoryTransactionStateStorage.InMemoryTransactionLog
        public void append(List<TransactionEdit> list) throws IOException {
            try {
                ThriftTransactionServerTest.STORAGE_WAIT_LATCH.await();
            } catch (InterruptedException e) {
                ThriftTransactionServerTest.LOG.error("Got exception: ", e);
            }
            super.append(list);
        }
    }

    /* loaded from: input_file:co/cask/tephra/distributed/ThriftTransactionServerTest$SlowTransactionStorage.class */
    private static class SlowTransactionStorage extends InMemoryTransactionStateStorage {
        private SlowTransactionStorage() {
        }

        @Override // co.cask.tephra.persist.InMemoryTransactionStateStorage
        public TransactionLog createLog(long j) throws IOException {
            return new SlowTransactionLog(j);
        }
    }

    @BeforeClass
    public static void start() throws Exception {
        zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
        zkServer.startAndWait();
        Configuration configuration = new Configuration();
        configuration.setBoolean("tx.persist", false);
        configuration.set("data.tx.zookeeper.quorum", zkServer.getConnectionStr());
        configuration.set("data.tx.client.retry.strategy", "n-times");
        configuration.setInt("data.tx.client.retry.attempts", 1);
        configuration.setInt("data.tx.client.count", NUM_CLIENTS);
        configuration.setLong("data.tx.client.timeout", TimeUnit.HOURS.toMillis(1L));
        configuration.setInt("data.tx.server.io.threads", 2);
        configuration.setInt("data.tx.server.threads", 4);
        injector = Guice.createInjector(new Module[]{new ConfigModule(configuration), new ZKModule(), new DiscoveryModules().getDistributedModules(), Modules.override(new Module[]{new TransactionModules().getDistributedModules()}).with(new Module[]{new AbstractModule() { // from class: co.cask.tephra.distributed.ThriftTransactionServerTest.1
            protected void configure() {
                bind(TransactionStateStorage.class).to(SlowTransactionStorage.class).in(Scopes.SINGLETON);
            }
        }}), new TransactionClientModule()});
        zkClientService = (ZKClientService) injector.getInstance(ZKClientService.class);
        zkClientService.startAndWait();
        txService = (TransactionService) injector.getInstance(TransactionService.class);
        storage = (TransactionStateStorage) injector.getInstance(TransactionStateStorage.class);
        try {
            LOG.info("Starting transaction service");
            txService.startAndWait();
        } catch (Exception e) {
            LOG.error("Failed to start service: ", e);
        }
    }

    @Before
    public void reset() throws Exception {
        getClient().resetState();
    }

    @AfterClass
    public static void stop() throws Exception {
        txService.stopAndWait();
        storage.stopAndWait();
        zkClientService.stopAndWait();
        zkServer.stopAndWait();
    }

    public TransactionSystemClient getClient() throws Exception {
        return (TransactionSystemClient) injector.getInstance(TransactionSystemClient.class);
    }

    @Test
    public void testThriftServerStop() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(NUM_CLIENTS);
        for (int i = 0; i < NUM_CLIENTS; i++) {
            newFixedThreadPool.submit(new Runnable() { // from class: co.cask.tephra.distributed.ThriftTransactionServerTest.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        TransactionSystemClient client = ThriftTransactionServerTest.this.getClient();
                        ThriftTransactionServerTest.CLIENTS_DONE_LATCH.countDown();
                        client.startShort();
                    } catch (Exception e) {
                    }
                }
            });
        }
        CLIENTS_DONE_LATCH.await();
        TimeUnit.SECONDS.sleep(1L);
        expireZkSession(zkClientService);
        waitForThriftTermination();
        zkClientService.stopAndWait();
        STORAGE_WAIT_LATCH.countDown();
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertEquals(Service.State.TERMINATED, txService.thriftRPCServerState());
    }

    private void expireZkSession(ZKClientService zKClientService) throws Exception {
        ZooKeeper zooKeeper = (ZooKeeper) zKClientService.getZooKeeperSupplier().get();
        final SettableFuture create = SettableFuture.create();
        ZooKeeper zooKeeper2 = new ZooKeeper(zKClientService.getConnectString(), zooKeeper.getSessionTimeout(), new Watcher() { // from class: co.cask.tephra.distributed.ThriftTransactionServerTest.3
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    create.set((Object) null);
                }
            }
        }, zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
        create.get(10L, TimeUnit.SECONDS);
        Assert.assertEquals("Failed to re-create current session", zooKeeper2.getState(), ZooKeeper.States.CONNECTED);
        zooKeeper2.close();
    }

    private void waitForThriftTermination() throws InterruptedException {
        int i = 0;
        while (txService.thriftRPCServerState() != Service.State.TERMINATED) {
            int i2 = i;
            i++;
            if (i2 >= 200) {
                return;
            } else {
                TimeUnit.MILLISECONDS.sleep(50L);
            }
        }
    }
}
