/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class CassandraIOIT
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraIOIT.class);
    private static CassandraIOITOptions options;
    private static final String KEYSPACE = "BEAM";
    private static final String TABLE = "BEAM_TEST";
    @Rule
    public transient TestPipeline pipelineWrite = TestPipeline.create();
    @Rule
    public transient TestPipeline pipelineRead = TestPipeline.create();

    @BeforeClass
    public static void setup() {
        options = (CassandraIOITOptions)IOITHelper.readIOTestPipelineOptions(CassandraIOITOptions.class);
        CassandraIOIT.dropTable(options, KEYSPACE, TABLE);
        CassandraIOIT.createTable(options, KEYSPACE, TABLE);
    }

    @AfterClass
    public static void tearDown() {
        CassandraIOIT.dropTable(options, KEYSPACE, TABLE);
    }

    @Test
    public void testWriteThenRead() {
        this.runWrite();
        this.runRead();
    }

    private void runWrite() {
        ((PCollection)((PCollection)((PCollection)this.pipelineWrite.apply("GenSequence", (PTransform)GenerateSequence.from((long)0L).to((long)options.getNumberOfRecords().intValue()))).apply("PrepareTestRows", (PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply("MapToEntity", (PTransform)ParDo.of((DoFn)new CreateScientistFn()))).apply("WriteToCassandra", (PTransform)CassandraIO.write().withHosts(options.getCassandraHost()).withPort(options.getCassandraPort().intValue()).withKeyspace(KEYSPACE).withEntity(Scientist.class));
        this.pipelineWrite.run().waitUntilFinish();
    }

    private void runRead() {
        PCollection output = (PCollection)this.pipelineRead.apply((PTransform)CassandraIO.read().withHosts(options.getCassandraHost()).withPort(options.getCassandraPort().intValue()).withMinNumberOfSplits(Integer.valueOf(20)).withKeyspace(KEYSPACE).withTable(TABLE).withEntity(Scientist.class).withCoder((Coder)SerializableCoder.of(Scientist.class)));
        PCollection consolidatedHashcode = (PCollection)((PCollection)output.apply((PTransform)ParDo.of((DoFn)new SelectNameFn()))).apply("Hash row contents", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.thatSingleton((PCollection)consolidatedHashcode).isEqualTo((Object)TestRow.getExpectedHashForRowCount((int)options.getNumberOfRecords()));
        this.pipelineRead.run().waitUntilFinish();
    }

    private static Cluster getCluster(CassandraIOITOptions options) {
        return Cluster.builder().addContactPoints(options.getCassandraHost().toArray(new String[0])).withPort(options.getCassandraPort().intValue()).build();
    }

    private static void createTable(CassandraIOITOptions options, String keyspace, String tableName) {
        try (Cluster cluster = CassandraIOIT.getCluster(options);
             Session session = cluster.connect();){
            LOG.info("Create {} keyspace if not exists", (Object)keyspace);
            session.execute("CREATE KEYSPACE IF NOT EXISTS BEAM WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor':3};");
            session.execute("USE " + keyspace);
            LOG.info("Create {} table if not exists", (Object)tableName);
            session.execute("CREATE TABLE IF NOT EXISTS " + tableName + "(id bigint, name text, PRIMARY KEY(id))");
        }
    }

    private static void dropTable(CassandraIOITOptions options, String keyspace, String table) {
        try (Cluster cluster = CassandraIOIT.getCluster(options);
             Session session = cluster.connect();){
            session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
            session.execute("DROP TABLE IF EXISTS " + keyspace + "." + table);
        }
    }

    private static class SelectNameFn
    extends DoFn<Scientist, String> {
        private SelectNameFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.output((Object)((Scientist)c.element()).name);
        }
    }

    private static class CreateScientistFn
    extends DoFn<TestRow, Scientist> {
        private CreateScientistFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.output((Object)new Scientist(((TestRow)c.element()).id().intValue(), ((TestRow)c.element()).name()));
        }
    }

    @Table(name="BEAM_TEST", keyspace="BEAM")
    private static final class Scientist
    implements Serializable {
        @PartitionKey
        @Column(name="id")
        final long id;
        @Column(name="name")
        final String name;

        Scientist() {
            this(0L, null);
        }

        Scientist(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public String toString() {
            return this.id + ": " + this.name;
        }
    }

    public static interface CassandraIOITOptions
    extends IOTestPipelineOptions {
        @Description(value="Host for Cassandra server (host name/ip address)")
        @Validation.Required
        public List<String> getCassandraHost();

        public void setCassandraHost(List<String> var1);

        @Description(value="Port for Cassandra server")
        @Default.Integer(value=9042)
        public Integer getCassandraPort();

        public void setCassandraPort(Integer var1);
    }
}

