package com.datastax.driver.core;

import com.codahale.metrics.Gauge;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Message;
import com.datastax.driver.core.ScassandraTestBase;
import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.mockito.Mockito;
import org.scassandra.cql.CqlType;
import org.scassandra.cql.PrimitiveType;
import org.scassandra.http.client.ClosedConnectionReport;
import org.scassandra.http.client.PrimingRequest;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/datastax/driver/core/HostConnectionPoolTest.class */
public class HostConnectionPoolTest extends ScassandraTestBase.PerClassCluster {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datastax/driver/core/HostConnectionPoolTest$MockRequest.class */
    public static class MockRequest implements Connection.ResponseCallback {
        final Connection connection;
        private Connection.ResponseHandler responseHandler;
        private final AtomicReference<State> state = new AtomicReference<>(State.START);

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/datastax/driver/core/HostConnectionPoolTest$MockRequest$State.class */
        public enum State {
            START,
            COMPLETED,
            FAILED,
            TIMED_OUT
        }

        static MockRequest send(HostConnectionPool hostConnectionPool) throws ConnectionException, BusyConnectionException, TimeoutException {
            MockRequest mockRequest = (MockRequest) Mockito.spy(new MockRequest(hostConnectionPool));
            mockRequest.responseHandler = new Connection.ResponseHandler(mockRequest.connection, mockRequest);
            mockRequest.connection.dispatcher.add(mockRequest.responseHandler);
            return mockRequest;
        }

        private MockRequest(HostConnectionPool hostConnectionPool) throws ConnectionException, TimeoutException, BusyConnectionException {
            this.connection = hostConnectionPool.borrowConnection(500L, TimeUnit.MILLISECONDS);
        }

        void simulateSuccessResponse() {
            onSet(this.connection, null, 0L, 0);
        }

        void simulateErrorResponse() {
            onException(this.connection, null, 0L, 0);
        }

        void simulateTimeout() {
            if (onTimeout(this.connection, 0L, 0)) {
                this.responseHandler.cancelHandler();
            }
        }

        public void onSet(Connection connection, Message.Response response, long j, int i) {
            if (this.state.compareAndSet(State.START, State.COMPLETED)) {
                connection.release();
                connection.dispatcher.removeHandler(this.responseHandler, true);
            }
        }

        public void onException(Connection connection, Exception exc, long j, int i) {
            if (this.state.compareAndSet(State.START, State.FAILED)) {
                connection.release();
                connection.dispatcher.removeHandler(this.responseHandler, true);
            }
        }

        public boolean onTimeout(Connection connection, long j, int i) {
            return this.state.compareAndSet(State.START, State.TIMED_OUT);
        }

        public Message.Request request() {
            return null;
        }

        public int retryCount() {
            return 0;
        }
    }

    @BeforeClass(groups = {"short", "long"})
    public void reinitializeCluster() {
        this.cluster.close();
    }

    private List<MockRequest> fillConnectionToThreshold(HostConnectionPool hostConnectionPool, List<Connection> list) throws ConnectionException, BusyConnectionException, TimeoutException {
        List<MockRequest> sendRequests = sendRequests(100, hostConnectionPool, list);
        sendRequests.add(MockRequest.send(hostConnectionPool));
        return sendRequests;
    }

    private List<MockRequest> sendRequests(int i, HostConnectionPool hostConnectionPool) throws ConnectionException, BusyConnectionException, TimeoutException {
        return sendRequests(i, hostConnectionPool, null);
    }

    private List<MockRequest> sendRequests(int i, HostConnectionPool hostConnectionPool, List<Connection> list) throws ConnectionException, BusyConnectionException, TimeoutException {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            MockRequest send = MockRequest.send(hostConnectionPool);
            newArrayList.add(send);
            if (list != null) {
                Assertions.assertThat(list).contains(new Connection[]{send.connection});
            }
        }
        return newArrayList;
    }

    @Override // com.datastax.driver.core.ScassandraTestBase
    protected Cluster.Builder configure(Cluster.Builder builder) {
        return builder.withProtocolVersion(TestUtils.getDesiredProtocolVersion(ProtocolVersion.V2));
    }

    private void completeRequests(int i, List<MockRequest> list) {
        Iterator<MockRequest> it = list.iterator();
        for (int i2 = 0; i2 < i && it.hasNext(); i2++) {
            it.next().simulateSuccessResponse();
            it.remove();
        }
    }

    private void completeRequests(List<MockRequest> list) {
        Iterator<MockRequest> it = list.iterator();
        while (it.hasNext()) {
            it.next().simulateSuccessResponse();
        }
    }

    @Test(groups = {"short"})
    public void fixed_size_pool_should_fill_its_core_connections_and_then_timeout() throws ConnectionException, TimeoutException, BusyConnectionException {
        Cluster build = createClusterBuilder().build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            HostConnectionPool createPool = createPool(build, 2, 2);
            Assertions.assertThat(createPool.connections.size()).isEqualTo(2);
            newArrayList.addAll(sendRequests(256, createPool, Lists.newArrayList(createPool.connections)));
            try {
                MockRequest.send(createPool);
                Assertions.fail("Expected a TimeoutException");
            } catch (TimeoutException e) {
            }
        } finally {
            completeRequests(newArrayList);
            build.close();
        }
    }

    @Test(groups = {"short"})
    public void variable_size_pool_should_fill_its_connections_and_then_timeout() throws Exception {
        Cluster build = createClusterBuilder().build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            HostConnectionPool createPool = createPool(build, 1, 2);
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            Assertions.assertThat(createPool.connections.size()).isEqualTo(1);
            ArrayList newArrayList2 = Lists.newArrayList(createPool.connections);
            newArrayList.addAll(fillConnectionToThreshold(createPool, newArrayList2));
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            for (int i = 0; i < 100; i++) {
                MockRequest send = MockRequest.send(createPool);
                Assertions.assertThat(newArrayList2).doesNotContain(new Connection[]{send.connection});
                newArrayList.add(send);
            }
            newArrayList.addAll(sendRequests(55, createPool));
            boolean z = false;
            try {
                MockRequest.send(createPool);
            } catch (TimeoutException e) {
                z = true;
            }
            Assertions.assertThat(z).isTrue();
            completeRequests(newArrayList);
            build.close();
        } catch (Throwable th) {
            completeRequests(newArrayList);
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_add_extra_connection_when_core_full() throws Exception {
        Cluster build = createClusterBuilder().build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            HostConnectionPool createPool = createPool(build, 1, 2);
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            newArrayList.addAll(fillConnectionToThreshold(createPool, Collections.singletonList((Connection) createPool.connections.get(0))));
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Assertions.assertThat(createPool.connections).hasSize(2);
            completeRequests(newArrayList);
            build.close();
        } catch (Throwable th) {
            completeRequests(newArrayList);
            build.close();
            throw th;
        }
    }

    @Test(groups = {"long"})
    public void should_resurrect_trashed_connection_within_idle_timeout() throws Exception {
        Cluster build = createClusterBuilder().withPoolingOptions(new PoolingOptions().setIdleTimeoutSeconds(20)).build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            HostConnectionPool createPool = createPool(build, 1, 2);
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            Connection connection = (Connection) createPool.connections.get(0);
            newArrayList.addAll(fillConnectionToThreshold(createPool, Collections.singletonList(connection)));
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Assertions.assertThat(createPool.connections).hasSize(2);
            Connection connection2 = (Connection) createPool.connections.get(1);
            Assertions.assertThat(connection.inFlight.get()).isEqualTo(101);
            Assertions.assertThat(connection2.inFlight.get()).isEqualTo(0);
            completeRequests(51, newArrayList);
            Assertions.assertThat(connection.inFlight.get()).isEqualTo(50);
            Assertions.assertThat(connection2.inFlight.get()).isEqualTo(0);
            Uninterruptibles.sleepUninterruptibly(20L, TimeUnit.SECONDS);
            Assertions.assertThat(createPool.connections).containsExactly(new Connection[]{connection2});
            Assertions.assertThat(createPool.trash).containsExactly(new Connection[]{connection});
            newArrayList.addAll(sendRequests(50, createPool));
            Assertions.assertThat(createPool.connections).containsExactly(new Connection[]{connection2});
            Assertions.assertThat(createPool.trash).containsExactly(new Connection[]{connection});
            Assertions.assertThat(connection.inFlight.get()).isEqualTo(50);
            Assertions.assertThat(connection2.inFlight.get()).isEqualTo(50);
            newArrayList.addAll(sendRequests(1, createPool));
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Assertions.assertThat(createPool.connections).containsExactly(new Connection[]{connection2, connection});
            Assertions.assertThat(createPool.trash).isEmpty();
            Assertions.assertThat(connection.inFlight.get()).isEqualTo(50);
            Assertions.assertThat(connection2.inFlight.get()).isEqualTo(51);
            completeRequests(newArrayList);
            build.close();
        } catch (Throwable th) {
            completeRequests(newArrayList);
            build.close();
            throw th;
        }
    }

    @Test(groups = {"long"})
    public void should_not_resurrect_trashed_connection_after_idle_timeout() throws Exception {
        Cluster build = createClusterBuilder().withPoolingOptions(new PoolingOptions().setIdleTimeoutSeconds(20)).build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            HostConnectionPool createPool = createPool(build, 1, 2);
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            Connection connection = (Connection) createPool.connections.get(0);
            newArrayList.addAll(fillConnectionToThreshold(createPool, Collections.singletonList(connection)));
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Mockito.reset(new Connection.Factory[]{factory});
            Assertions.assertThat(createPool.connections).hasSize(2);
            Connection connection2 = (Connection) createPool.connections.get(1);
            Assertions.assertThat(connection.inFlight.get()).isEqualTo(101);
            Assertions.assertThat(connection2.inFlight.get()).isEqualTo(0);
            completeRequests(51, newArrayList);
            Assertions.assertThat(connection.inFlight.get()).isEqualTo(50);
            Assertions.assertThat(connection2.inFlight.get()).isEqualTo(0);
            Uninterruptibles.sleepUninterruptibly(20L, TimeUnit.SECONDS);
            Assertions.assertThat(createPool.connections).containsExactly(new Connection[]{connection2});
            Assertions.assertThat(createPool.trash).containsExactly(new Connection[]{connection});
            completeRequests(50, newArrayList);
            Assertions.assertThat(connection.inFlight.get()).isEqualTo(0);
            Uninterruptibles.sleepUninterruptibly(30L, TimeUnit.SECONDS);
            Assertions.assertThat(createPool.connections).containsExactly(new Connection[]{connection2});
            Assertions.assertThat(createPool.trash).isEmpty();
            Assertions.assertThat(connection.isClosed()).isTrue();
            newArrayList.addAll(fillConnectionToThreshold(createPool, Collections.singletonList(connection2)));
            Assertions.assertThat(connection2.inFlight.get()).isEqualTo(101);
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            MockRequest send = MockRequest.send(createPool);
            newArrayList.add(send);
            Assertions.assertThat(send.connection).isNotEqualTo(connection2).isNotEqualTo(connection);
            completeRequests(newArrayList);
            build.close();
        } catch (Throwable th) {
            completeRequests(newArrayList);
            build.close();
            throw th;
        }
    }

    @Test(groups = {"long"})
    public void should_not_close_trashed_connection_until_no_in_flight() throws Exception {
        Cluster build = createClusterBuilder().withPoolingOptions(new PoolingOptions().setIdleTimeoutSeconds(20)).build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            HostConnectionPool createPool = createPool(build, 1, 2);
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            Connection connection = (Connection) createPool.connections.get(0);
            newArrayList.addAll(fillConnectionToThreshold(createPool, Collections.singletonList(connection)));
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Assertions.assertThat(createPool.connections).hasSize(2);
            completeRequests(50, newArrayList);
            Uninterruptibles.sleepUninterruptibly(30L, TimeUnit.SECONDS);
            Assertions.assertThat(createPool.trash).containsExactly(new Connection[]{connection});
            Assertions.assertThat(connection.inFlight.get()).isEqualTo(51);
            Assertions.assertThat(connection.isClosed()).isFalse();
            completeRequests(51, newArrayList);
            Uninterruptibles.sleepUninterruptibly(30L, TimeUnit.SECONDS);
            Assertions.assertThat(connection.isClosed()).isTrue();
            Assertions.assertThat(createPool.connections).doesNotContain(new Connection[]{connection});
            Assertions.assertThat(createPool.trash).doesNotContain(new Connection[]{connection});
            completeRequests(newArrayList);
            build.close();
        } catch (Throwable th) {
            completeRequests(newArrayList);
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_trash_on_returning_connection_with_insufficient_streams() throws Exception {
        Cluster build = createClusterBuilder().build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            HostConnectionPool createPool = createPool(build, 1, 2);
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            newArrayList.addAll(fillConnectionToThreshold(createPool, Collections.singletonList((Connection) createPool.connections.get(0))));
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Assertions.assertThat(createPool.connections).hasSize(2);
            Connection connection = (Connection) Mockito.spy(createPool.connections.get(1));
            createPool.connections.set(1, connection);
            newArrayList.addAll(sendRequests(10, createPool));
            Assertions.assertThat(createPool.connections).hasSize(2);
            ((Connection) Mockito.doReturn(0).when(connection)).maxAvailableStreams();
            Assertions.assertThat(createPool.trash).hasSize(0);
            createPool.returnConnection(connection);
            Assertions.assertThat(createPool.connections).hasSize(1);
            Assertions.assertThat(createPool.trash).hasSize(1);
            completeRequests(newArrayList);
            build.close();
        } catch (Throwable th) {
            completeRequests(newArrayList);
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_keep_host_up_when_one_connection_lost() throws Exception {
        Cluster build = createClusterBuilder().build();
        try {
            HostConnectionPool createPool = createPool(build, 2, 2);
            Connection connection = (Connection) createPool.connections.get(0);
            Connection connection2 = (Connection) createPool.connections.get(1);
            this.currentClient.disableListener();
            this.currentClient.closeConnection(ClosedConnectionReport.CloseType.CLOSE, (InetSocketAddress) connection.channel.localAddress());
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            Assertions.assertThat(connection.isClosed()).isTrue();
            Assertions.assertThat(connection2.isClosed()).isFalse();
            Assertions.assertThat(createPool.connections).doesNotContain(new Connection[]{connection});
            Assertions.assertThat(build).host(1).hasState(Host.State.UP);
            Assertions.assertThat(build).hasOpenControlConnection();
            build.close();
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_mark_host_down_when_no_connections_remaining() throws Exception {
        Cluster build = createClusterBuilder().withSocketOptions(new SocketOptions().setConnectTimeoutMillis(1000).setReadTimeoutMillis(1000)).withReconnectionPolicy(new ConstantReconnectionPolicy(1000L)).build();
        try {
            build.init();
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            ArrayList newArrayList = Lists.newArrayList(createPool(build, 8, 8).connections);
            Mockito.reset(new Connection.Factory[]{factory});
            this.currentClient.disableListener();
            this.currentClient.closeConnections(ClosedConnectionReport.CloseType.CLOSE);
            Assertions.assertThat(build).host(1).goesDownWithin(10L, TimeUnit.SECONDS);
            Assertions.assertThat(build).hasClosedControlConnection();
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                Assertions.assertThat(((Connection) it.next()).isClosed()).isTrue();
            }
            ((Connection.Factory) Mockito.verify(factory, Mockito.timeout(1000 * 2).atLeastOnce())).open(this.host);
            Uninterruptibles.sleepUninterruptibly(1000 * 2, TimeUnit.MILLISECONDS);
            Assertions.assertThat(build).hasClosedControlConnection();
            this.currentClient.enableListener();
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000 * 2).atLeast(2))).open(this.host);
            ((Connection.Factory) Mockito.verify(factory, Mockito.timeout(1000 * 2).times(7))).newConnection((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
            Assertions.assertThat(build).hasOpenControlConnection();
            Assertions.assertThat(build).host(1).hasState(Host.State.UP);
            build.close();
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_create_new_connections_when_connection_lost_and_under_core_connections() throws Exception {
        Cluster build = createClusterBuilder().withSocketOptions(new SocketOptions().setConnectTimeoutMillis(1000).setReadTimeoutMillis(1000)).withReconnectionPolicy(new ConstantReconnectionPolicy(1000L)).build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            build.init();
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            HostConnectionPool createPool = createPool(build, 3, 3);
            Connection connection = (Connection) createPool.connections.get(0);
            Connection connection2 = (Connection) createPool.connections.get(1);
            Connection connection3 = (Connection) createPool.connections.get(2);
            this.currentClient.disableListener();
            this.currentClient.closeConnection(ClosedConnectionReport.CloseType.CLOSE, (InetSocketAddress) connection.channel.localAddress());
            this.currentClient.closeConnection(ClosedConnectionReport.CloseType.CLOSE, (InetSocketAddress) connection3.channel.localAddress());
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            Assertions.assertThat(build).host(1).hasState(Host.State.UP);
            Assertions.assertThat(createPool.connections).hasSize(1);
            MockRequest send = MockRequest.send(createPool);
            newArrayList.add(send);
            Assertions.assertThat(send.connection).isEqualTo(connection2);
            ((Connection.Factory) Mockito.verify(factory, Mockito.never())).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
            MockRequest send2 = MockRequest.send(createPool);
            newArrayList.add(send2);
            Assertions.assertThat(send2.connection).isEqualTo(connection2);
            ((Connection.Factory) Mockito.verify(factory, Mockito.timeout(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Mockito.reset(new Connection.Factory[]{factory});
            Uninterruptibles.sleepUninterruptibly((1000 + 1000) * 2, TimeUnit.MILLISECONDS);
            this.currentClient.enableListener();
            Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
            newArrayList.add(MockRequest.send(createPool));
            ((Connection.Factory) Mockito.verify(factory, Mockito.timeout((1000 + 1000) * 2).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Mockito.reset(new Connection.Factory[]{factory});
            newArrayList.add(MockRequest.send(createPool));
            ((Connection.Factory) Mockito.verify(factory, Mockito.timeout((1000 + 1000) * 2).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Mockito.reset(new Connection.Factory[]{factory});
            newArrayList.add(MockRequest.send(createPool));
            ((Connection.Factory) Mockito.verify(factory, Mockito.after((1000 + 1000) * 2).never())).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            completeRequests(newArrayList);
            build.close();
        } catch (Throwable th) {
            completeRequests(newArrayList);
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_not_schedule_reconnect_when_connection_lost_and_at_core_connections() throws Exception {
        Cluster build = createClusterBuilder().withSocketOptions(new SocketOptions().setConnectTimeoutMillis(1000).setReadTimeoutMillis(1000)).withReconnectionPolicy(new ConstantReconnectionPolicy(1000L)).build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            build.init();
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            HostConnectionPool createPool = createPool(build, 1, 2);
            Connection connection = (Connection) createPool.connections.get(0);
            ArrayList newArrayList2 = Lists.newArrayList();
            for (int i = 0; i < 101; i++) {
                MockRequest send = MockRequest.send(createPool);
                Assertions.assertThat(send.connection).isEqualTo(connection);
                newArrayList2.add(send);
            }
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).times(1))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Assertions.assertThat(createPool.connections).hasSize(2);
            Mockito.reset(new Connection.Factory[]{factory});
            Connection connection2 = (Connection) createPool.connections.get(1);
            this.currentClient.closeConnection(ClosedConnectionReport.CloseType.CLOSE, (InetSocketAddress) connection.channel.localAddress());
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            this.currentClient.disableListener();
            Iterator it = newArrayList2.iterator();
            while (it.hasNext()) {
                ((MockRequest) Mockito.verify((MockRequest) it.next(), Mockito.times(1))).onException((Connection) Mockito.any(Connection.class), (Exception) Mockito.any(Exception.class), Mockito.anyLong(), Mockito.anyInt());
            }
            Assertions.assertThat(build).host(1).hasState(Host.State.UP);
            newArrayList.addAll(sendRequests(100, createPool, Collections.singletonList(connection2)));
            Assertions.assertThat(createPool.connections).hasSize(1);
            ((Connection.Factory) Mockito.verify(factory, Mockito.after(1000).never())).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            MockRequest send2 = MockRequest.send(createPool);
            newArrayList.add(send2);
            Assertions.assertThat(send2.connection).isEqualTo(connection2);
            ((Connection.Factory) Mockito.verify(factory, Mockito.timeout(1000))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Assertions.assertThat(createPool.connections).hasSize(1);
            Uninterruptibles.sleepUninterruptibly(1000 * 2, TimeUnit.MILLISECONDS);
            Assertions.assertThat(createPool.connections).hasSize(1);
            this.currentClient.enableListener();
            Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
            MockRequest send3 = MockRequest.send(createPool);
            newArrayList.add(send3);
            Assertions.assertThat(send3.connection).isEqualTo(connection2);
            Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
            Assertions.assertThat(createPool.connections).hasSize(2);
            MockRequest send4 = MockRequest.send(createPool);
            newArrayList.add(send4);
            Assertions.assertThat(send4.connection).isNotEqualTo(connection).isNotEqualTo(connection2);
            completeRequests(newArrayList);
            build.close();
        } catch (Throwable th) {
            completeRequests(newArrayList);
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_not_mark_host_down_if_some_connections_fail_on_init() throws Exception {
        Cluster build = createClusterBuilder().withSocketOptions(new SocketOptions().setConnectTimeoutMillis(1000).setReadTimeoutMillis(1000)).withReconnectionPolicy(new ConstantReconnectionPolicy(1000L)).build();
        ArrayList newArrayList = Lists.newArrayList();
        try {
            build.init();
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            this.currentClient.disableListener(4);
            HostConnectionPool createPool = createPool(build, 8, 8);
            Mockito.reset(new Connection.Factory[]{factory});
            Assertions.assertThat(createPool.connections).hasSize(4);
            Assertions.assertThat(build).host(1).hasState(Host.State.UP);
            Assertions.assertThat(build).hasOpenControlConnection();
            this.currentClient.enableListener();
            Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
            for (int i = 5; i <= 8; i++) {
                newArrayList.add(MockRequest.send(createPool));
                ((Connection.Factory) Mockito.verify(factory, Mockito.timeout(1000))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
                Mockito.reset(new Connection.Factory[]{factory});
                Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
                Assertions.assertThat(createPool.connections).hasSize(i);
            }
        } finally {
            completeRequests(newArrayList);
            build.close();
        }
    }

    @Test(groups = {"short"}, expectedExceptions = {TimeoutException.class})
    public void should_throw_exception_if_convicted_and_no_connections_available() throws Exception {
        Cluster build = createClusterBuilder().withSocketOptions(new SocketOptions().setConnectTimeoutMillis(1000).setReadTimeoutMillis(1000)).withReconnectionPolicy(new ConstantReconnectionPolicy(1000L)).build();
        try {
            build.init();
            Assertions.assertThat(build).hasOpenControlConnection();
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            this.currentClient.disableListener();
            HostConnectionPool createPool = createPool(build, 8, 8);
            Mockito.reset(new Connection.Factory[]{factory});
            Assertions.assertThat(createPool.connections).hasSize(0);
            Assertions.assertThat(build).host(1).hasState(Host.State.UP);
            Assertions.assertThat(build).hasOpenControlConnection();
            MockRequest.send(createPool);
            build.close();
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_wait_on_connection_if_not_convicted_and_no_connections_available() throws Exception {
        Cluster build = createClusterBuilder().withSocketOptions(new SocketOptions().setConnectTimeoutMillis(1000).setReadTimeoutMillis(1000)).withReconnectionPolicy(new ConstantReconnectionPolicy(1000L)).build();
        try {
            build.init();
            Assertions.assertThat(build).hasOpenControlConnection();
            Connection.Factory factory = (Connection.Factory) Mockito.spy(build.manager.connectionFactory);
            build.manager.connectionFactory = factory;
            this.currentClient.disableListener();
            HostConnectionPool createPool = createPool(build, 8, 8);
            Assertions.assertThat(createPool.connections).hasSize(0);
            Assertions.assertThat(build).host(1).hasState(Host.State.UP);
            Assertions.assertThat(build).hasOpenControlConnection();
            this.currentClient.enableListener();
            Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
            Mockito.reset(new Connection.Factory[]{factory});
            MockRequest.send(createPool).simulateSuccessResponse();
            ((Connection.Factory) Mockito.verify(factory, Mockito.timeout(1000 * 8).times(8))).open((HostConnectionPool) Mockito.any(HostConnectionPool.class));
            Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);
            Assertions.assertThat(createPool.connections).hasSize(8);
            build.close();
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    @Test(groups = {"short"})
    public void should_wait_on_connection_if_zero_core_connections() throws Exception {
        Cluster build = createClusterBuilder().withSocketOptions(new SocketOptions().setConnectTimeoutMillis(1000).setReadTimeoutMillis(1000)).withReconnectionPolicy(new ConstantReconnectionPolicy(1000L)).build();
        try {
            build.init();
            Assertions.assertThat(build).hasOpenControlConnection();
            HostConnectionPool createPool = createPool(build, 0, 2);
            Assertions.assertThat(createPool.connections).hasSize(0);
            Assertions.assertThat(build).host(1).hasState(Host.State.UP);
            Assertions.assertThat(build).hasOpenControlConnection();
            MockRequest.send(createPool).simulateSuccessResponse();
            Assertions.assertThat(createPool.connections).hasSize(1);
            build.close();
        } catch (Throwable th) {
            build.close();
            throw th;
        }
    }

    private HostConnectionPool createPool(Cluster cluster, int i, int i2) {
        cluster.getConfiguration().getPoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, i2).setCoreConnectionsPerHost(HostDistance.LOCAL, i);
        SessionManager connect = cluster.connect();
        return (HostConnectionPool) connect.pools.get(TestUtils.findHost(cluster, 1));
    }

    @Test(groups = {"long"}, enabled = false)
    public void open_connections_metric_should_always_be_positive() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(100000);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Runnable runnable = new Runnable() { // from class: com.datastax.driver.core.HostConnectionPoolTest.1
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        };
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final Gauge openConnections = this.cluster.getMetrics().getOpenConnections();
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
        newScheduledThreadPool.scheduleAtFixedRate(new Runnable() { // from class: com.datastax.driver.core.HostConnectionPoolTest.2
            @Override // java.lang.Runnable
            public void run() {
                Integer num = (Integer) openConnections.getValue();
                if (num.intValue() < 0) {
                    System.err.println("Negative value spotted for openConnection metric: " + num);
                    atomicBoolean.set(true);
                }
            }
        }, 1L, 1L, TimeUnit.SECONDS);
        PreparedStatement prepare = this.session.prepare(generateJava349InsertStatement());
        for (int i = 0; i < 100000; i++) {
            this.session.executeAsync(prepare.bind(new Object[]{Integer.valueOf(i)})).addListener(runnable, newSingleThreadExecutor);
        }
        countDownLatch.await();
        newScheduledThreadPool.shutdownNow();
        newSingleThreadExecutor.shutdownNow();
        if (atomicBoolean.get()) {
            Assert.fail("Negative value spotted for open connection count");
        }
    }

    private String generateJava349InsertStatement() {
        StringBuilder sb = new StringBuilder("INSERT INTO Java349 (mykey");
        for (int i = 0; i < 1000; i++) {
            sb.append(", column").append(i);
        }
        sb.append(") VALUES (?");
        for (int i2 = 0; i2 < 1000; i2++) {
            sb.append(", ").append(i2);
        }
        sb.append(");");
        this.primingClient.prime(PrimingRequest.preparedStatementBuilder().withQuery(sb.toString()).withThen(PrimingRequest.then().withVariableTypes(new CqlType[]{PrimitiveType.INT})).build());
        return sb.toString();
    }
}
