package com.datastax.driver.core;

import com.datastax.driver.core.utils.SocketChannelMonitor;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@CCMConfig(dirtiesContext = {true})
/* loaded from: input_file:com/datastax/driver/core/SessionStressTest.class */
public class SessionStressTest extends CCMTestsSupport {
    private static final Logger logger = LoggerFactory.getLogger(SessionStressTest.class);
    private Cluster stressCluster;
    private final SocketChannelMonitor channelMonitor = new SocketChannelMonitor();
    private ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(8));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/driver/core/SessionStressTest$CloseSession.class */
    public static class CloseSession implements Callable<CloseFuture> {
        private Session session;
        private final CountDownLatch startSignal;

        CloseSession(Session session, CountDownLatch countDownLatch) {
            this.session = session;
            this.startSignal = countDownLatch;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public CloseFuture call() throws Exception {
            this.startSignal.await();
            try {
                return this.session.closeAsync();
            } finally {
                this.session = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/driver/core/SessionStressTest$OpenSession.class */
    public class OpenSession implements Callable<Session> {
        private final CountDownLatch startSignal;

        OpenSession(CountDownLatch countDownLatch) {
            this.startSignal = countDownLatch;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Session call() throws Exception {
            this.startSignal.await();
            return SessionStressTest.this.stressCluster.connect();
        }
    }

    @AfterMethod(groups = {"long"}, alwaysRun = true)
    public void shutdown() throws Exception {
        this.executorService.shutdown();
        try {
            if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                Assert.fail("executor ran for longer than expected");
            }
        } catch (InterruptedException e) {
            Assert.fail("Interrupted while waiting for executor to shutdown");
        } finally {
            this.executorService = null;
            System.gc();
        }
    }

    @Test(groups = {"long"})
    public void sessions_should_not_leak_connections() {
        this.channelMonitor.reportAtFixedInterval(1, TimeUnit.SECONDS);
        this.stressCluster = Cluster.builder().addContactPoints(getContactPoints()).withPort(ccm().getBinaryPort()).withPoolingOptions(new PoolingOptions().setCoreConnectionsPerHost(HostDistance.LOCAL, 1)).withNettyOptions(this.channelMonitor.nettyOptions()).build();
        try {
            this.stressCluster.init();
            Assert.assertEquals(this.stressCluster.manager.sessions.size(), 0);
            Assert.assertEquals(((Integer) this.stressCluster.getMetrics().getOpenConnections().getValue()).intValue(), 1);
            Session connect = this.stressCluster.connect();
            Assert.assertEquals(this.stressCluster.manager.sessions.size(), 1);
            int numberOfLocalCoreConnections = TestUtils.numberOfLocalCoreConnections(this.stressCluster);
            Assert.assertEquals(((Integer) this.stressCluster.getMetrics().getOpenConnections().getValue()).intValue(), 1 + numberOfLocalCoreConnections);
            Assert.assertEquals(this.channelMonitor.openChannels(getContactPointsWithPorts()).size(), 1 + numberOfLocalCoreConnections);
            connect.close();
            Assert.assertEquals(this.stressCluster.manager.sessions.size(), 0);
            Assert.assertEquals(((Integer) this.stressCluster.getMetrics().getOpenConnections().getValue()).intValue(), 1);
            Assert.assertEquals(this.channelMonitor.openChannels(getContactPointsWithPorts()).size(), 1);
            int i = 2000 / 2;
            for (int i2 = 1; i2 <= 5; i2++) {
                logger.info("On iteration {}/{}.", Integer.valueOf(i2), 5);
                logger.info("Creating {} sessions.", 2000);
                waitFor(openSessionsConcurrently(2000));
                Assert.assertEquals(this.stressCluster.manager.sessions.size(), 2000);
                Assert.assertEquals(((Integer) this.stressCluster.getMetrics().getOpenConnections().getValue()).intValue(), (numberOfLocalCoreConnections * 2000) + 1);
                Assert.assertEquals(this.channelMonitor.openChannels(getContactPointsWithPorts()).size(), (numberOfLocalCoreConnections * 2000) + 1);
                logger.info("Closing {}/{} sessions.", Integer.valueOf(i), 2000);
                waitFor(closeSessionsConcurrently(i));
                Assert.assertEquals(this.stressCluster.manager.sessions.size(), i);
                Assert.assertEquals(((Integer) this.stressCluster.getMetrics().getOpenConnections().getValue()).intValue(), (numberOfLocalCoreConnections * (2000 / 2)) + 1);
                Assert.assertEquals(this.channelMonitor.openChannels(getContactPointsWithPorts()).size(), (numberOfLocalCoreConnections * (2000 / 2)) + 1);
                logger.info("Closing and Opening {} sessions concurrently.", Integer.valueOf(i));
                CountDownLatch countDownLatch = new CountDownLatch(2);
                List<ListenableFuture<Session>> openSessionsConcurrently = openSessionsConcurrently(i, countDownLatch);
                List<ListenableFuture<Void>> closeSessionsConcurrently = closeSessionsConcurrently(i, countDownLatch);
                countDownLatch.countDown();
                waitFor(openSessionsConcurrently);
                waitFor(closeSessionsConcurrently);
                Assert.assertEquals(this.stressCluster.manager.sessions.size(), i);
                Assert.assertEquals(((Integer) this.stressCluster.getMetrics().getOpenConnections().getValue()).intValue(), (numberOfLocalCoreConnections * (2000 / 2)) + 1);
                Assert.assertEquals(this.channelMonitor.openChannels(getContactPointsWithPorts()).size(), (numberOfLocalCoreConnections * (2000 / 2)) + 1);
                logger.info("Closing remaining {} sessions.", Integer.valueOf(i));
                waitFor(closeSessionsConcurrently(i));
                Assert.assertEquals(this.stressCluster.manager.sessions.size(), 0);
                Assert.assertEquals(((Integer) this.stressCluster.getMetrics().getOpenConnections().getValue()).intValue(), 1);
                Assert.assertEquals(this.channelMonitor.openChannels(getContactPointsWithPorts()).size(), 1);
                logger.info("Sleeping {} seconds so that TCP connections are released by the OS", 20);
                Uninterruptibles.sleepUninterruptibly(20, TimeUnit.SECONDS);
            }
        } finally {
            this.stressCluster.close();
            this.stressCluster = null;
            Assert.assertEquals(this.channelMonitor.openChannels(getContactPointsWithPorts()).size(), 0);
            this.channelMonitor.stop();
            this.channelMonitor.report();
            logger.info("Sleeping 60 extra seconds");
            Uninterruptibles.sleepUninterruptibly(60L, TimeUnit.SECONDS);
        }
    }

    private List<ListenableFuture<Session>> openSessionsConcurrently(int i) {
        return openSessionsConcurrently(i, new CountDownLatch(1));
    }

    private List<ListenableFuture<Session>> openSessionsConcurrently(int i, CountDownLatch countDownLatch) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        for (int i2 = 0; i2 < i; i2++) {
            newArrayListWithCapacity.add(this.executorService.submit(new OpenSession(countDownLatch)));
        }
        countDownLatch.countDown();
        return newArrayListWithCapacity;
    }

    private List<ListenableFuture<Void>> closeSessionsConcurrently(int i) {
        return closeSessionsConcurrently(i, new CountDownLatch(1));
    }

    private List<ListenableFuture<Void>> closeSessionsConcurrently(int i, CountDownLatch countDownLatch) {
        Stack stack = new Stack();
        Iterator it = this.stressCluster.manager.sessions.iterator();
        for (int i2 = 0; i2 < i; i2++) {
            stack.push(it.next());
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        for (int i3 = 0; i3 < i; i3++) {
            newArrayListWithCapacity.add(this.executorService.submit(new CloseSession((Session) stack.pop(), countDownLatch)));
        }
        countDownLatch.countDown();
        ArrayList newArrayListWithCapacity2 = Lists.newArrayListWithCapacity(i);
        Iterator it2 = newArrayListWithCapacity.iterator();
        while (it2.hasNext()) {
            try {
                newArrayListWithCapacity2.add(((ListenableFuture) it2.next()).get());
            } catch (Exception e) {
                logger.error("Got interrupted exception while waiting on closeFuture.", e);
            }
        }
        return newArrayListWithCapacity2;
    }

    private <E> void waitFor(List<ListenableFuture<E>> list) {
        Iterator<ListenableFuture<E>> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for future", e);
            } catch (ExecutionException e2) {
                e2.printStackTrace();
                Assert.fail(e2.getMessage());
            }
        }
    }
}
