package com.datastax.driver.core;

import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.utils.SocketChannelMonitor;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.channel.socket.SocketChannel;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/datastax/driver/core/TimeoutStressTest.class */
public class TimeoutStressTest {
    static final int CONCURRENT_QUERIES = 25;
    static final long DURATION = 60000;
    static final int READ_TIMEOUT_IN_MS = 50;
    static final int CONNECTION_TIMEOUT_IN_MS = 20;
    static final String KEYSPACE = "testks";
    static Cluster cluster;
    static CCMBridge ccmBridge;
    static SocketChannelMonitor channelMonitor;
    static List<InetSocketAddress> nodes;
    static PreparedStatement statement;
    static final Logger logger = LoggerFactory.getLogger(TimeoutStressTest.class);
    static AtomicInteger executedQueries = new AtomicInteger(0);

    /* loaded from: input_file:com/datastax/driver/core/TimeoutStressTest$TimeoutStressWorker.class */
    public static class TimeoutStressWorker implements Runnable {
        private final Semaphore concurrentQueries;
        private final AtomicBoolean stopped;
        private final Session session;

        public TimeoutStressWorker(Session session, Semaphore semaphore, AtomicBoolean atomicBoolean) {
            this.session = session;
            this.concurrentQueries = semaphore;
            this.stopped = atomicBoolean;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.stopped.get()) {
                try {
                    this.concurrentQueries.acquire();
                    Futures.addCallback(this.session.executeAsync(TimeoutStressTest.statement.bind(new Object[]{"0"})), new FutureCallback<ResultSet>() { // from class: com.datastax.driver.core.TimeoutStressTest.TimeoutStressWorker.1
                        public void onSuccess(ResultSet resultSet) {
                            TimeoutStressWorker.this.concurrentQueries.release();
                            if (TimeoutStressTest.executedQueries.incrementAndGet() % 1000 == 0) {
                                TimeoutStressTest.logger.debug("Successfully executed {}.  rows: {}", Integer.valueOf(TimeoutStressTest.executedQueries.get()), Integer.valueOf(resultSet.getAvailableWithoutFetching()));
                            }
                        }

                        public void onFailure(Throwable th) {
                            TimeoutStressWorker.this.concurrentQueries.release();
                            if (th instanceof NoHostAvailableException) {
                            }
                        }
                    });
                } catch (Exception e) {
                    TimeoutStressTest.logger.error("Failure while submitting query.", e);
                }
            }
        }
    }

    @BeforeClass(groups = {"long"})
    public void beforeClass() throws Exception {
        ccmBridge = CCMBridge.create(TestUtils.SIMPLE_TABLE, 3, new String[0]);
        channelMonitor = new SocketChannelMonitor();
        nodes = Lists.newArrayList(new InetSocketAddress[]{new InetSocketAddress(CCMBridge.IP_PREFIX + '1', 9042), new InetSocketAddress(CCMBridge.IP_PREFIX + '2', 9042), new InetSocketAddress(CCMBridge.IP_PREFIX + '3', 9042)});
        cluster = Cluster.builder().addContactPointsWithPorts(nodes).withPoolingOptions(new PoolingOptions().setCoreConnectionsPerHost(HostDistance.LOCAL, 8)).withNettyOptions(channelMonitor.nettyOptions()).build();
        Session connect = cluster.connect();
        setupSchema(connect);
        connect.close();
    }

    @AfterClass(groups = {"long"})
    public void afterClass() {
        if (ccmBridge != null) {
            ccmBridge.stop();
        }
        if (channelMonitor != null) {
            channelMonitor.stop();
        }
        if (cluster != null) {
            cluster.close();
        }
    }

    @Test(groups = {"long"})
    public void host_state_should_be_maintained_with_timeouts() {
        cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(CONNECTION_TIMEOUT_IN_MS);
        cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(READ_TIMEOUT_IN_MS);
        Session connect = cluster.connect();
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(availableProcessors, new ThreadFactoryBuilder().setNameFormat("timeout-stress-test-worker-%d").setDaemon(true).build());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        int maxConnectionsPerHost = (cluster.getConfiguration().getPoolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL) * nodes.size()) + 1;
        try {
            Semaphore semaphore = new Semaphore(CONCURRENT_QUERIES);
            for (int i = 0; i < availableProcessors; i++) {
                newFixedThreadPool.submit(new TimeoutStressWorker(connect, semaphore, atomicBoolean));
            }
            long currentTimeMillis = System.currentTimeMillis();
            while (System.currentTimeMillis() - currentTimeMillis < DURATION) {
                Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
                channelMonitor.report();
                Collection<SocketChannel> openChannels = channelMonitor.openChannels(nodes);
                if (openChannels.size() > maxConnectionsPerHost) {
                    logger.warn("{} of open channels: {} exceeds maximum expected: {}.  This could be because there are connections to be cleaned up in the reaper.", new Object[]{Integer.valueOf(openChannels.size()), Integer.valueOf(maxConnectionsPerHost), openChannels});
                }
            }
            atomicBoolean.set(true);
            cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(5000);
            cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(12000);
            logger.debug("Sleeping 20 seconds to allow connection reaper to clean up connections and for the pools to recover.");
            Uninterruptibles.sleepUninterruptibly(20L, TimeUnit.SECONDS);
            Collection<SocketChannel> openChannels2 = channelMonitor.openChannels(nodes);
            org.assertj.core.api.Assertions.assertThat(openChannels2.size()).as("Number of open connections does not meet expected: %s", new Object[]{openChannels2}).isEqualTo(maxConnectionsPerHost);
            connect.close();
            Collection<SocketChannel> openChannels3 = channelMonitor.openChannels(nodes);
            org.assertj.core.api.Assertions.assertThat(openChannels3.size()).as("Number of open connections does not meet expected: %s", new Object[]{openChannels3}).isEqualTo(1);
            newFixedThreadPool.shutdown();
        } catch (Throwable th) {
            atomicBoolean.set(true);
            cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(5000);
            cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(12000);
            logger.debug("Sleeping 20 seconds to allow connection reaper to clean up connections and for the pools to recover.");
            Uninterruptibles.sleepUninterruptibly(20L, TimeUnit.SECONDS);
            Collection<SocketChannel> openChannels4 = channelMonitor.openChannels(nodes);
            org.assertj.core.api.Assertions.assertThat(openChannels4.size()).as("Number of open connections does not meet expected: %s", new Object[]{openChannels4}).isEqualTo(maxConnectionsPerHost);
            connect.close();
            Collection<SocketChannel> openChannels5 = channelMonitor.openChannels(nodes);
            org.assertj.core.api.Assertions.assertThat(openChannels5.size()).as("Number of open connections does not meet expected: %s", new Object[]{openChannels5}).isEqualTo(1);
            newFixedThreadPool.shutdown();
            throw th;
        }
    }

    private static void setupSchema(Session session) throws InterruptedException, ExecutionException, TimeoutException {
        logger.debug("Creating keyspace");
        session.execute("create KEYSPACE testks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}");
        session.execute("use testks");
        logger.debug("Creating table");
        session.execute("create table record (\n  name text,\n  phone text,\n  value text,\n  PRIMARY KEY (name, phone)\n);");
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(30000);
        PreparedStatement prepare = session.prepare("insert into record (name, phone, value) values (?, ?, ?)");
        for (int i = 0; i < 30000; i++) {
            if (i % 1000 == 0) {
                logger.debug("Inserting record {}.", Integer.valueOf(i));
            }
            newArrayListWithExpectedSize.add(session.executeAsync(prepare.bind(new Object[]{"0", Integer.toString(i), TestUtils.SIMPLE_TABLE})));
        }
        statement = session.prepare("select * from testks.record where name=? limit 1000;");
        logger.debug("Completed inserting data.  Waiting up to 30 for inserts to complete seconds before proceeding.");
        Futures.allAsList(newArrayListWithExpectedSize).get(30L, TimeUnit.SECONDS);
        logger.debug("Inserts complete.");
    }
}
