/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.driver.core;

import com.datastax.driver.core.Assertions;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Connection;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostConnectionPool;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.MemoryAppender;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ScassandraTestBase;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SessionManager;
import com.datastax.driver.core.TestUtils;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.scassandra.http.client.PrimingRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class HeartbeatTest
extends ScassandraTestBase {
    static Logger logger = LoggerFactory.getLogger(HeartbeatTest.class);
    org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
    MemoryAppender logs;
    Level originalLevel;

    @BeforeMethod(groups={"long"})
    public void startCapturingLogs() {
        this.originalLevel = this.connectionLogger.getLevel();
        this.connectionLogger.setLevel(Level.DEBUG);
        this.logs = new MemoryAppender("%t - %m%n");
        this.connectionLogger.addAppender((Appender)this.logs);
    }

    @AfterMethod(groups={"long"}, alwaysRun=true)
    public void stopCapturingLogs() {
        this.connectionLogger.setLevel(this.originalLevel);
        this.connectionLogger.removeAppender((Appender)this.logs);
    }

    private String getLog(Cluster cluster) {
        String[] lines = this.logs.getNext().split("\\r?\\n");
        StringBuilder filtered = new StringBuilder();
        for (String line : lines) {
            if (!line.startsWith(cluster.getClusterName() + "-nio-worker")) continue;
            filtered.append(line);
            filtered.append("\n");
        }
        return filtered.toString();
    }

    @Test(groups={"long"})
    public void should_send_heartbeat_when_connection_is_inactive() throws InterruptedException {
        Cluster cluster = Cluster.builder().addContactPoints(new InetAddress[]{this.hostAddress.getAddress()}).withPort(this.scassandra.getBinaryPort()).withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(3)).build();
        try {
            int i;
            cluster.init();
            for (i = 0; i < 5; ++i) {
                this.triggerRequestOnControlConnection(cluster);
                TimeUnit.SECONDS.sleep(1L);
            }
            Assertions.assertThat((String)this.getLog(cluster)).doesNotContain((CharSequence)"sending heartbeat");
            TimeUnit.SECONDS.sleep(4L);
            Assertions.assertThat((String)this.getLog(cluster)).contains(new CharSequence[]{"sending heartbeat"}).contains(new CharSequence[]{"heartbeat query succeeded"});
            TimeUnit.SECONDS.sleep(4L);
            Assertions.assertThat((String)this.getLog(cluster)).contains(new CharSequence[]{"sending heartbeat"}).contains(new CharSequence[]{"heartbeat query succeeded"});
            this.getLog(cluster);
            for (i = 0; i < 5; ++i) {
                this.triggerRequestOnControlConnection(cluster);
                TimeUnit.SECONDS.sleep(1L);
            }
            Assertions.assertThat((String)this.getLog(cluster)).doesNotContain((CharSequence)"sending heartbeat");
            TimeUnit.SECONDS.sleep(4L);
            Assertions.assertThat((String)this.getLog(cluster)).contains(new CharSequence[]{"sending heartbeat"}).contains(new CharSequence[]{"heartbeat query succeeded"});
        }
        finally {
            cluster.close();
        }
    }

    private void assertLineMatches(String logs, Pattern pattern) {
        String[] lines;
        for (String line : lines = logs.split("\\r?\\n")) {
            if (!pattern.matcher(line).matches()) continue;
            return;
        }
        org.assertj.core.api.Assertions.fail((String)("Expecting: [" + logs + "] to contain " + pattern));
    }

    private void assertNoLineMatches(String logs, Pattern pattern) {
        String[] lines;
        for (String line : lines = logs.split("\\r?\\n")) {
            if (!pattern.matcher(line).matches()) continue;
            org.assertj.core.api.Assertions.fail((String)("Expecting: [" + logs + "] not to contain " + pattern));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"long"})
    public void should_send_heartbeat_when_requests_being_written_but_nothing_received() throws Exception {
        Cluster cluster = Cluster.builder().addContactPoints(new InetAddress[]{this.hostAddress.getAddress()}).withPort(this.scassandra.getBinaryPort()).withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(3).setConnectionsPerHost(HostDistance.LOCAL, 1, 1)).build();
        this.scassandra.primingClient().prime(PrimingRequest.queryBuilder().withQuery("ping").withThen(PrimingRequest.then().withFixedDelay(Long.valueOf(8675309999L))));
        Thread submitter = null;
        try {
            cluster.init();
            SessionManager session = (SessionManager)cluster.connect();
            Host host = TestUtils.findHost(cluster, 1);
            Connection connection = (Connection)((HostConnectionPool)session.pools.get((Object)host)).connections.get(0);
            String connectionName = connection.toString().replaceAll("\\-", "\\\\-").replaceAll("Connection\\[\\/", "").replaceAll("\\, inFlight.*", "");
            Pattern heartbeatSentPattern = Pattern.compile(".*" + connectionName + ".*sending heartbeat");
            Pattern heartbeatReceivedPattern = Pattern.compile(".*" + connectionName + ".*heartbeat query succeeded");
            logger.debug("Heartbeat pattern is {}", (Object)heartbeatSentPattern);
            submitter = new Thread(new QuerySubmitter((Session)session));
            submitter.start();
            for (int i = 0; i < 5; ++i) {
                session.execute("bar");
                TimeUnit.SECONDS.sleep(1L);
            }
            String log = this.getLog(cluster);
            this.assertNoLineMatches(log, heartbeatSentPattern);
            int inFlight = connection.inFlight.get();
            Assertions.assertThat((int)inFlight).isGreaterThan(0);
            TimeUnit.SECONDS.sleep(4L);
            Assertions.assertThat((int)connection.inFlight.get()).isGreaterThan(inFlight);
            log = this.getLog(cluster);
            this.assertLineMatches(log, heartbeatSentPattern);
            this.assertLineMatches(log, heartbeatReceivedPattern);
        }
        finally {
            if (submitter != null) {
                submitter.interrupt();
            }
            cluster.close();
        }
    }

    @Test(groups={"long"})
    public void should_not_send_heartbeat_when_disabled() throws InterruptedException {
        Cluster cluster = Cluster.builder().addContactPoints(new InetAddress[]{this.hostAddress.getAddress()}).withPort(this.scassandra.getBinaryPort()).withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(0)).build();
        try {
            cluster.init();
            for (int i = 0; i < 5; ++i) {
                this.triggerRequestOnControlConnection(cluster);
                TimeUnit.SECONDS.sleep(1L);
            }
            Assertions.assertThat((String)this.getLog(cluster)).doesNotContain((CharSequence)"sending heartbeat");
            TimeUnit.SECONDS.sleep(32L);
            Assertions.assertThat((String)this.getLog(cluster)).doesNotContain((CharSequence)"sending heartbeat");
        }
        finally {
            cluster.close();
        }
    }

    private void triggerRequestOnControlConnection(Cluster cluster) {
        cluster.manager.controlConnection.refreshNodeInfo(TestUtils.findHost(cluster, 1));
    }

    private static class QuerySubmitter
    implements Runnable {
        private final Session session;

        QuerySubmitter(Session session) {
            this.session = session;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                logger.debug("Sending ping, for which we expect no response");
                this.session.executeAsync("ping");
                Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.SECONDS);
            }
        }
    }
}

