/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.CCMTestsSupport;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TestUtils;
import com.datastax.driver.core.utils.SocketChannelMonitor;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

public class ClusterStressTest
extends CCMTestsSupport {
    private static final Logger logger = LoggerFactory.getLogger(ClusterStressTest.class);
    private ExecutorService executorService = Executors.newFixedThreadPool(8);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"long"})
    public void clusters_should_not_leak_connections() {
        int numberOfClusters = 10;
        int numberOfIterations = 500;
        try {
            for (int i = 1; i < numberOfIterations; ++i) {
                logger.info("On iteration {}/{}.", (Object)i, (Object)numberOfIterations);
                logger.info("Creating {} clusters", (Object)numberOfClusters);
                List<CreateClusterAndCheckConnections> actions = this.waitForCreates(this.createClustersConcurrently(numberOfClusters));
                this.waitForCloses(this.closeClustersConcurrently(actions));
                if (!logger.isDebugEnabled()) continue;
                logger.debug("# {} threads currently running", (Object)Thread.getAllStackTraces().keySet().size());
            }
        }
        finally {
            logger.info("Sleeping 60 seconds to free TCP resources");
            Uninterruptibles.sleepUninterruptibly((long)60L, (TimeUnit)TimeUnit.SECONDS);
        }
    }

    @AfterMethod(groups={"long"}, alwaysRun=true)
    public void shutdown() throws Exception {
        this.executorService.shutdown();
        try {
            boolean shutdown = this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
            if (!shutdown) {
                Assert.fail((String)"executor ran for longer than expected");
            }
        }
        catch (InterruptedException e) {
            Assert.fail((String)"Interrupted while waiting for executor to shutdown");
        }
        finally {
            this.executorService = null;
            System.gc();
        }
    }

    private List<Future<CreateClusterAndCheckConnections>> createClustersConcurrently(int numberOfClusters) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        return this.createClustersConcurrently(numberOfClusters, countDownLatch);
    }

    private List<Future<CreateClusterAndCheckConnections>> createClustersConcurrently(int numberOfClusters, CountDownLatch countDownLatch) {
        ArrayList clusterFutures = Lists.newArrayListWithCapacity((int)numberOfClusters);
        for (int i = 0; i < numberOfClusters; ++i) {
            clusterFutures.add(this.executorService.submit(new CreateClusterAndCheckConnections(countDownLatch)));
        }
        countDownLatch.countDown();
        return clusterFutures;
    }

    private List<Future<Void>> closeClustersConcurrently(List<CreateClusterAndCheckConnections> actions) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        return this.closeClustersConcurrently(actions, countDownLatch);
    }

    private List<Future<Void>> closeClustersConcurrently(List<CreateClusterAndCheckConnections> actions, CountDownLatch startSignal) {
        ArrayList closeFutures = Lists.newArrayListWithCapacity((int)actions.size());
        for (CreateClusterAndCheckConnections action : actions) {
            closeFutures.add(this.executorService.submit(new CloseCluster(action.cluster, action.channelMonitor, startSignal)));
        }
        startSignal.countDown();
        return closeFutures;
    }

    private List<CreateClusterAndCheckConnections> waitForCreates(List<Future<CreateClusterAndCheckConnections>> futures) {
        ArrayList actions = Lists.newArrayListWithCapacity((int)futures.size());
        AssertionError error = null;
        for (Future<CreateClusterAndCheckConnections> future : futures) {
            try {
                actions.add(future.get());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (error != null) continue;
                error = ClusterStressTest.assertionError("Interrupted while waiting for future creation", e);
            }
            catch (ExecutionException e) {
                if (error != null) continue;
                Throwable cause = e.getCause();
                if (cause instanceof AssertionError) {
                    error = (AssertionError)((Object)cause);
                    continue;
                }
                error = ClusterStressTest.assertionError("Error while creating a cluster", cause);
            }
        }
        if (error != null) {
            for (CreateClusterAndCheckConnections action : actions) {
                action.cluster.close();
            }
            throw error;
        }
        return actions;
    }

    private List<Void> waitForCloses(List<Future<Void>> futures) {
        ArrayList<Void> result = new ArrayList<Void>(futures.size());
        AssertionError error = null;
        for (Future<Void> future : futures) {
            try {
                result.add(future.get());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                if (error != null) continue;
                error = ClusterStressTest.assertionError("Interrupted while waiting for future", e);
            }
            catch (ExecutionException e) {
                if (error != null) continue;
                Throwable cause = e.getCause();
                if (cause instanceof AssertionError) {
                    error = (AssertionError)((Object)cause);
                    continue;
                }
                error = ClusterStressTest.assertionError("Error while closing a cluster", cause);
            }
        }
        if (error != null) {
            throw error;
        }
        return result;
    }

    private static AssertionError assertionError(String message, Throwable cause) {
        AssertionError error = new AssertionError((Object)message);
        ((Throwable)((Object)error)).initCause(cause);
        return error;
    }

    private class CloseCluster
    implements Callable<Void> {
        private Cluster cluster;
        private SocketChannelMonitor channelMonitor;
        private final CountDownLatch startSignal;

        CloseCluster(Cluster cluster, SocketChannelMonitor channelMonitor, CountDownLatch startSignal) {
            this.cluster = cluster;
            this.channelMonitor = channelMonitor;
            this.startSignal = startSignal;
        }

        @Override
        public Void call() throws Exception {
            this.startSignal.await();
            try {
                this.cluster.close();
                Assert.assertEquals((int)this.cluster.manager.sessions.size(), (int)0);
                Assert.assertEquals((int)this.channelMonitor.openChannels(ClusterStressTest.this.getContactPointsWithPorts()).size(), (int)0);
            }
            finally {
                this.channelMonitor.stop();
                this.cluster = null;
                this.channelMonitor = null;
            }
            return null;
        }
    }

    private class CreateClusterAndCheckConnections
    implements Callable<CreateClusterAndCheckConnections> {
        private final CountDownLatch startSignal;
        private Cluster cluster;
        private final SocketChannelMonitor channelMonitor = new SocketChannelMonitor();

        CreateClusterAndCheckConnections(CountDownLatch startSignal) {
            this.startSignal = startSignal;
            this.cluster = Cluster.builder().addContactPoints(ClusterStressTest.this.getContactPoints()).withPort(ClusterStressTest.this.ccm().getBinaryPort()).withPoolingOptions(new PoolingOptions().setCoreConnectionsPerHost(HostDistance.LOCAL, 1)).withNettyOptions(this.channelMonitor.nettyOptions()).build();
        }

        @Override
        public CreateClusterAndCheckConnections call() throws Exception {
            this.startSignal.await();
            try {
                this.cluster.init();
                Assert.assertEquals((int)this.cluster.manager.sessions.size(), (int)0);
                Assert.assertEquals((int)((Integer)this.cluster.getMetrics().getOpenConnections().getValue()), (int)1);
                Assert.assertEquals((int)this.channelMonitor.openChannels(ClusterStressTest.this.getContactPointsWithPorts()).size(), (int)1);
                Session session = this.cluster.connect();
                Assert.assertEquals((int)this.cluster.manager.sessions.size(), (int)1);
                Assert.assertEquals((int)((Integer)this.cluster.getMetrics().getOpenConnections().getValue()), (int)(1 + TestUtils.numberOfLocalCoreConnections(this.cluster)));
                Assert.assertEquals((int)this.channelMonitor.openChannels(ClusterStressTest.this.getContactPointsWithPorts()).size(), (int)(1 + TestUtils.numberOfLocalCoreConnections(this.cluster)));
                session.close();
                Assert.assertEquals((int)this.cluster.manager.sessions.size(), (int)0);
                Assert.assertEquals((int)((Integer)this.cluster.getMetrics().getOpenConnections().getValue()), (int)1);
                Assert.assertEquals((int)this.channelMonitor.openChannels(ClusterStressTest.this.getContactPointsWithPorts()).size(), (int)1);
                CreateClusterAndCheckConnections createClusterAndCheckConnections = this;
                return createClusterAndCheckConnections;
            }
            catch (AssertionError e) {
                this.cluster.close();
                this.cluster = null;
                throw e;
            }
            finally {
                this.channelMonitor.stop();
            }
        }
    }
}

