package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOIT.class */
public class CassandraIOIT implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraIOIT.class);
    private static CassandraIOITOptions options;
    private static final String KEYSPACE = "BEAM";
    private static final String TABLE = "BEAM_TEST";

    @Rule
    public transient TestPipeline pipelineWrite = TestPipeline.create();

    @Rule
    public transient TestPipeline pipelineRead = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOIT$CassandraIOITOptions.class */
    public interface CassandraIOITOptions extends IOTestPipelineOptions {
        @Description("Host for Cassandra server (host name/ip address)")
        @Validation.Required
        List<String> getCassandraHost();

        void setCassandraHost(List<String> list);

        @Description("Port for Cassandra server")
        @Default.Integer(9042)
        Integer getCassandraPort();

        void setCassandraPort(Integer num);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOIT$CreateScientistFn.class */
    public static class CreateScientistFn extends DoFn<TestRow, Scientist> {
        private CreateScientistFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<TestRow, Scientist>.ProcessContext processContext) {
            processContext.output(new Scientist(((TestRow) processContext.element()).id().intValue(), ((TestRow) processContext.element()).name()));
        }
    }

    @Table(name = CassandraIOIT.TABLE, keyspace = CassandraIOIT.KEYSPACE)
    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOIT$Scientist.class */
    private static final class Scientist implements Serializable {

        @PartitionKey
        @Column(name = "id")
        final long id;

        @Column(name = "name")
        final String name;

        Scientist() {
            this(0L, null);
        }

        Scientist(long j, String str) {
            this.id = j;
            this.name = str;
        }

        public String toString() {
            return this.id + ": " + this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOIT$SelectNameFn.class */
    public static class SelectNameFn extends DoFn<Scientist, String> {
        private SelectNameFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Scientist, String>.ProcessContext processContext) {
            processContext.output(((Scientist) processContext.element()).name);
        }
    }

    @BeforeClass
    public static void setup() {
        options = (CassandraIOITOptions) IOITHelper.readIOTestPipelineOptions(CassandraIOITOptions.class);
        dropTable(options, KEYSPACE, TABLE);
        createTable(options, KEYSPACE, TABLE);
    }

    @AfterClass
    public static void tearDown() {
        dropTable(options, KEYSPACE, TABLE);
    }

    @Test
    public void testWriteThenRead() {
        runWrite();
        runRead();
    }

    private void runWrite() {
        this.pipelineWrite.apply("GenSequence", GenerateSequence.from(0L).to(options.getNumberOfRecords().intValue())).apply("PrepareTestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply("MapToEntity", ParDo.of(new CreateScientistFn())).apply("WriteToCassandra", CassandraIO.write().withHosts(options.getCassandraHost()).withPort(options.getCassandraPort().intValue()).withKeyspace(KEYSPACE).withEntity(Scientist.class));
        this.pipelineWrite.run().waitUntilFinish();
    }

    private void runRead() {
        PAssert.thatSingleton(this.pipelineRead.apply(CassandraIO.read().withHosts(options.getCassandraHost()).withPort(options.getCassandraPort().intValue()).withMinNumberOfSplits(20).withKeyspace(KEYSPACE).withTable(TABLE).withEntity(Scientist.class).withCoder(SerializableCoder.of(Scientist.class))).apply(ParDo.of(new SelectNameFn())).apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults())).isEqualTo(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords().intValue()));
        this.pipelineRead.run().waitUntilFinish();
    }

    private static Cluster getCluster(CassandraIOITOptions cassandraIOITOptions) {
        return Cluster.builder().addContactPoints((String[]) cassandraIOITOptions.getCassandraHost().toArray(new String[0])).withPort(cassandraIOITOptions.getCassandraPort().intValue()).build();
    }

    private static void createTable(CassandraIOITOptions cassandraIOITOptions, String str, String str2) {
        Cluster cluster = getCluster(cassandraIOITOptions);
        try {
            Session connect = cluster.connect();
            Throwable th = null;
            try {
                try {
                    LOG.info("Create {} keyspace if not exists", str);
                    connect.execute("CREATE KEYSPACE IF NOT EXISTS BEAM WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3};");
                    connect.execute("USE " + str);
                    LOG.info("Create {} table if not exists", str2);
                    connect.execute("CREATE TABLE IF NOT EXISTS " + str2 + "(id bigint, name text, PRIMARY KEY(id))");
                    if (connect != null) {
                        $closeResource(null, connect);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (connect != null) {
                    $closeResource(th, connect);
                }
                throw th3;
            }
        } finally {
            if (cluster != null) {
                $closeResource(null, cluster);
            }
        }
    }

    private static void dropTable(CassandraIOITOptions cassandraIOITOptions, String str, String str2) {
        Cluster cluster = getCluster(cassandraIOITOptions);
        try {
            Session connect = cluster.connect();
            Throwable th = null;
            try {
                try {
                    connect.execute("DROP KEYSPACE IF EXISTS " + str);
                    connect.execute("DROP TABLE IF EXISTS " + str + "." + str2);
                    if (connect != null) {
                        $closeResource(null, connect);
                    }
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (connect != null) {
                    $closeResource(th, connect);
                }
                throw th3;
            }
        } finally {
            if (cluster != null) {
                $closeResource(null, cluster);
            }
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
