package com.datastax.oss.driver.api.core.heartbeat;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
import com.datastax.oss.driver.api.testinfra.utils.ConditionChecker;
import com.datastax.oss.driver.api.testinfra.utils.NodeUtils;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.protocol.internal.request.Register;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.common.cluster.QueryLog;
import com.datastax.oss.simulacron.common.request.Options;
import com.datastax.oss.simulacron.common.stubbing.CloseType;
import com.datastax.oss.simulacron.common.stubbing.DisconnectAction;
import com.datastax.oss.simulacron.common.stubbing.PrimeDsl;
import com.datastax.oss.simulacron.server.BoundNode;
import com.datastax.oss.simulacron.server.RejectScope;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ParallelizableTests.class})
/* loaded from: input_file:com/datastax/oss/driver/api/core/heartbeat/HeartbeatIT.class */
public class HeartbeatIT {

    @Rule
    public SimulacronRule simulacron = new SimulacronRule(ClusterSpec.builder().withNodes(new int[]{1}));
    private static final String QUERY = "select * from foo";
    private static final Predicate<QueryLog> IS_OPTION_REQUEST = queryLog -> {
        return queryLog.getQuery().equals("OPTIONS");
    };
    private BoundNode simulacronNode;

    @Before
    public void setUp() {
        this.simulacron.cluster().clearLogs();
        this.simulacron.cluster().clearPrimes(true);
        this.simulacronNode = (BoundNode) this.simulacron.cluster().getNodes().iterator().next();
    }

    @Test
    public void node_should_go_down_gracefully_when_connection_closed_during_heartbeat() {
        CqlSession newSession = newSession(new String[0]);
        Throwable th = null;
        try {
            try {
                Node node = (Node) newSession.getMetadata().getNodes().values().iterator().next();
                Assertions.assertThat(node.getState()).isEqualTo(NodeState.UP);
                this.simulacronNode.rejectConnections(0, RejectScope.UNBIND);
                int size = getHeartbeatsForNode().size();
                this.simulacronNode.prime(PrimeDsl.when(Options.INSTANCE).then(PrimeDsl.closeConnection(DisconnectAction.Scope.CONNECTION, CloseType.DISCONNECT)));
                NodeUtils.waitForDown(node);
                Assertions.assertThat(getHeartbeatsForNode().size()).isGreaterThan(size);
                if (newSession != null) {
                    $closeResource(null, newSession);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (newSession != null) {
                $closeResource(th, newSession);
            }
            throw th3;
        }
    }

    @Test
    public void should_not_send_heartbeat_during_protocol_initialization() {
        this.simulacronNode.rejectConnections(0, RejectScope.REJECT_STARTUP);
        try {
            CqlSession newSession = newSession(new String[0]);
            try {
                Assertions.fail("Expected session creation to fail");
                if (newSession != null) {
                    $closeResource(null, newSession);
                }
            } catch (Throwable th) {
                if (newSession != null) {
                    $closeResource(null, newSession);
                }
                throw th;
            }
        } catch (Exception e) {
            Assertions.assertThat(getHeartbeatsForNode()).isEmpty();
        }
    }

    @Test
    public void should_send_heartbeat_on_control_connection() {
        CqlSession newSession = newSession("connection.pool.local.size = 0");
        try {
            AtomicInteger countHeartbeatsOnControlConnection = countHeartbeatsOnControlConnection();
            ConditionChecker.checkThat(() -> {
                return countHeartbeatsOnControlConnection.get() > 0;
            }).becomesTrue();
            if (newSession != null) {
                $closeResource(null, newSession);
            }
        } catch (Throwable th) {
            if (newSession != null) {
                $closeResource(null, newSession);
            }
            throw th;
        }
    }

    @Test
    public void should_send_heartbeat_on_regular_connection() throws InterruptedException {
        this.simulacronNode.prime(PrimeDsl.when(QUERY).then(PrimeDsl.rows().row(new Object[]{"column1", "1", "column2", "2"})));
        CqlSession newSession = newSession(new String[0]);
        try {
            Assertions.assertThat(newSession.execute(QUERY)).hasSize(1);
            AtomicInteger countHeartbeatsOnRegularConnection = countHeartbeatsOnRegularConnection();
            for (int i = 0; i < 20; i++) {
                Assertions.assertThat(newSession.execute(QUERY)).hasSize(1);
                TimeUnit.MILLISECONDS.sleep(100L);
            }
            Assertions.assertThat(countHeartbeatsOnRegularConnection.get()).isZero();
            ConditionChecker.checkThat(() -> {
                return countHeartbeatsOnRegularConnection.get() >= 1;
            }).becomesTrue();
            if (newSession != null) {
                $closeResource(null, newSession);
            }
        } catch (Throwable th) {
            if (newSession != null) {
                $closeResource(null, newSession);
            }
            throw th;
        }
    }

    @Test
    public void should_send_heartbeat_when_requests_being_written_but_nothing_received() throws InterruptedException {
        this.simulacron.cluster().prime(PrimeDsl.when("delay").then(PrimeDsl.noResult()));
        CqlSession newSession = newSession(new String[0]);
        Throwable th = null;
        try {
            try {
                AtomicInteger countHeartbeatsOnRegularConnection = countHeartbeatsOnRegularConnection();
                for (int i = 0; i < 25; i++) {
                    newSession.executeAsync("delay");
                    newSession.executeAsync("delay");
                    TimeUnit.MILLISECONDS.sleep(100L);
                }
                Assertions.assertThat(countHeartbeatsOnRegularConnection.get()).isGreaterThanOrEqualTo(2);
                if (newSession != null) {
                    $closeResource(null, newSession);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (newSession != null) {
                $closeResource(th, newSession);
            }
            throw th3;
        }
    }

    @Test
    public void should_close_connection_when_heartbeat_times_out() {
        CqlSession newSession = newSession(new String[0]);
        try {
            Node node = (Node) newSession.getMetadata().getNodes().values().iterator().next();
            Assertions.assertThat(node.getState()).isEqualTo(NodeState.UP);
            AtomicInteger atomicInteger = new AtomicInteger();
            this.simulacronNode.registerQueryListener((boundNode, queryLog) -> {
                atomicInteger.incrementAndGet();
            }, true, IS_OPTION_REQUEST);
            ConditionChecker.checkThat(() -> {
                return atomicInteger.get() >= 2;
            }).becomesTrue();
            Assertions.assertThat(node.getState()).isEqualTo(NodeState.UP);
            this.simulacronNode.prime(PrimeDsl.when(Options.INSTANCE).then(PrimeDsl.noResult()));
            atomicInteger.set(0);
            ConditionChecker.checkThat(() -> {
                return atomicInteger.get() >= 1;
            }).becomesTrue();
            atomicInteger.set(0);
            NodeUtils.waitForDown(node);
            this.simulacronNode.clearPrimes();
            NodeUtils.waitForUp(node);
            ConditionChecker.checkThat(() -> {
                return atomicInteger.get() >= 2;
            }).becomesTrue();
            Assertions.assertThat(node.getState()).isEqualTo(NodeState.UP);
            if (newSession != null) {
                $closeResource(null, newSession);
            }
        } catch (Throwable th) {
            if (newSession != null) {
                $closeResource(null, newSession);
            }
            throw th;
        }
    }

    private CqlSession newSession(String... strArr) {
        String[] strArr2 = {"connection.heartbeat.interval = 1 second", "connection.heartbeat.timeout = 500 milliseconds", "connection.init-query-timeout = 2 seconds", "connection.reconnection-policy.max-delay = 1 second"};
        String[] strArr3 = (String[]) Arrays.copyOf(strArr2, strArr2.length + strArr.length);
        System.arraycopy(strArr, 0, strArr3, strArr2.length, strArr.length);
        return SessionUtils.newSession(this.simulacron, strArr3);
    }

    private AtomicInteger countHeartbeatsOnRegularConnection() {
        return countHeartbeats(true);
    }

    private AtomicInteger countHeartbeatsOnControlConnection() {
        return countHeartbeats(false);
    }

    private AtomicInteger countHeartbeats(boolean z) {
        SocketAddress findControlConnectionAddress = findControlConnectionAddress();
        AtomicInteger atomicInteger = new AtomicInteger();
        this.simulacron.cluster().registerQueryListener((boundNode, queryLog) -> {
            atomicInteger.incrementAndGet();
        }, false, queryLog2 -> {
            return IS_OPTION_REQUEST.test(queryLog2) && (z ^ queryLog2.getConnection().equals(findControlConnectionAddress));
        });
        return atomicInteger;
    }

    private SocketAddress findControlConnectionAddress() {
        for (QueryLog queryLog : this.simulacronNode.getLogs().getQueryLogs()) {
            if (queryLog.getFrame().message instanceof Register) {
                return queryLog.getConnection();
            }
        }
        throw new AssertionError("Could not find address of control connection");
    }

    private List<QueryLog> getHeartbeatsForNode() {
        return (List) this.simulacronNode.getLogs().getQueryLogs().stream().filter(queryLog -> {
            return queryLog.getQuery().equals("OPTIONS");
        }).collect(Collectors.toList());
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
