/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sinks;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.tests.integration.containers.CassandraContainer;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

public class CassandraSinkTester
extends SinkTester<CassandraContainer> {
    private static final Logger log = LoggerFactory.getLogger(CassandraSinkTester.class);
    private static final String NAME = "cassandra";
    private static final String ROOTS = "cassandra";
    private static final String KEY = "key";
    private static final String COLUMN = "col";
    private static final String ARCHIVE = "/pulsar/connectors/pulsar-io-cassandra-" + PulsarVersion.getVersion() + ".nar";
    private final String keySpace;
    private final String tableName;
    private Cluster cluster;
    private Session session;

    public static CassandraSinkTester createTester(boolean builtin) {
        if (builtin) {
            return new CassandraSinkTester(builtin);
        }
        return new CassandraSinkTester();
    }

    private CassandraSinkTester() {
        super("cassandra", ARCHIVE, "org.apache.pulsar.io.cassandra.CassandraStringSink");
        String suffix = PulsarClusterTestBase.randomName(8) + "_" + System.currentTimeMillis();
        this.keySpace = "keySpace_" + suffix;
        this.tableName = "tableName_" + suffix;
        this.sinkConfig.put("roots", "cassandra");
        this.sinkConfig.put("keyspace", this.keySpace);
        this.sinkConfig.put("columnFamily", this.tableName);
        this.sinkConfig.put("keyname", KEY);
        this.sinkConfig.put("columnName", COLUMN);
    }

    private CassandraSinkTester(boolean builtin) {
        super("cassandra", SinkTester.SinkType.CASSANDRA);
        String suffix = PulsarClusterTestBase.randomName(8) + "_" + System.currentTimeMillis();
        this.keySpace = "keySpace_" + suffix;
        this.tableName = "tableName_" + suffix;
        this.sinkConfig.put("roots", "cassandra");
        this.sinkConfig.put("keyspace", this.keySpace);
        this.sinkConfig.put("columnFamily", this.tableName);
        this.sinkConfig.put("keyname", KEY);
        this.sinkConfig.put("columnName", COLUMN);
    }

    @Override
    protected CassandraContainer createSinkService(PulsarCluster cluster) {
        return new CassandraContainer(cluster.getClusterName());
    }

    @Override
    public void prepareSink() {
        this.cluster = Cluster.builder().addContactPoint("localhost").withPort(((CassandraContainer)this.serviceContainer).getCassandraPort()).withoutJMXReporting().build();
        this.session = this.cluster.connect();
        log.info("Connecting to cassandra cluster at localhost:{}", (Object)((CassandraContainer)this.serviceContainer).getCassandraPort());
        String createKeySpace = "CREATE KEYSPACE " + this.keySpace + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; ";
        log.info(createKeySpace);
        this.session.execute(createKeySpace);
        this.session.execute("USE " + this.keySpace);
        String createTable = "CREATE TABLE " + this.tableName + "(key text PRIMARY KEY, col text);";
        log.info(createTable);
        this.session.execute(createTable);
    }

    @Override
    public void validateSinkResult(Map<String, String> kvs) {
        String query = "SELECT * FROM " + this.tableName + ";";
        ResultSet result = this.session.execute(query);
        List rows = result.all();
        Assert.assertEquals((int)kvs.size(), (int)rows.size());
        for (Row row : rows) {
            String key = row.getString(KEY);
            String value = row.getString(COLUMN);
            String expectedValue = kvs.get(key);
            Assert.assertNotNull((Object)expectedValue);
            Assert.assertEquals((String)expectedValue, (String)value);
        }
    }

    @Override
    public void close() throws Exception {
        if (this.session != null) {
            this.session.close();
            this.session = null;
        }
        if (this.cluster != null) {
            this.cluster.close();
            this.cluster = null;
        }
    }
}

