/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.cassandra.CassandraTestDataSet;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class CassandraIOIT
implements Serializable {
    private static IOTestPipelineOptions options;
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @BeforeClass
    public static void setup() throws Exception {
        PipelineOptionsFactory.register(IOTestPipelineOptions.class);
        options = (IOTestPipelineOptions)TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
    }

    @AfterClass
    public static void tearDown() {
        CassandraTestDataSet.cleanUpDataTable(options);
    }

    @Test
    public void testRead() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(options.getCassandraHost())).withPort(options.getCassandraPort().intValue()).withKeyspace("BEAM").withTable("BEAM_READ_TEST").withEntity(Scientist.class).withCoder((Coder)SerializableCoder.of(Scientist.class)));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count scientist", Count.globally()))).isEqualTo((Object)1000L);
        PCollection mapped = (PCollection)output.apply((PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Scientist, KV<String, Integer>>(){

            public KV<String, Integer> apply(Scientist scientist) {
                KV kv = KV.of((Object)scientist.name, (Object)scientist.id);
                return kv;
            }
        }));
        PAssert.that((PCollection)((PCollection)mapped.apply("Count occurrences per scientist", Count.perKey()))).satisfies((SerializableFunction)new SerializableFunction<Iterable<KV<String, Long>>, Void>(){

            public Void apply(Iterable<KV<String, Long>> input) {
                for (KV<String, Long> element : input) {
                    Assert.assertEquals((String)((String)element.getKey()), (long)100L, (long)((Long)element.getValue()));
                }
                return null;
            }
        });
        this.pipeline.run().waitUntilFinish();
    }

    @Test
    public void testWrite() throws Exception {
        IOTestPipelineOptions options = (IOTestPipelineOptions)TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
        options.setOnSuccessMatcher((SerializableMatcher)new CassandraMatcher(CassandraTestDataSet.getCluster(options), "BEAM_WRITE_TEST"));
        TestPipeline.convertToArgs((PipelineOptions)options);
        ArrayList<ScientistForWrite> data = new ArrayList<ScientistForWrite>();
        for (int i = 0; i < 1000; ++i) {
            ScientistForWrite scientist = new ScientistForWrite();
            scientist.id = i;
            scientist.name = "Name " + i;
            data.add(scientist);
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)CassandraIO.write().withHosts(Collections.singletonList(options.getCassandraHost())).withPort(options.getCassandraPort().intValue()).withKeyspace("BEAM").withEntity(ScientistForWrite.class));
        this.pipeline.run().waitUntilFinish();
    }

    @Table(name="BEAM_WRITE_TEST", keyspace="BEAM")
    public class ScientistForWrite
    implements Serializable {
        @PartitionKey
        @Column(name="id")
        public Integer id;
        @Column(name="name")
        public String name;

        public String toString() {
            return this.id + ":" + this.name;
        }
    }

    @Table(name="BEAM_READ_TEST", keyspace="BEAM")
    public static class Scientist
    implements Serializable {
        @PartitionKey
        @Column(name="id")
        private final int id;
        @Column(name="name")
        private final String name;

        public Scientist() {
            this(0, "");
        }

        public Scientist(int id) {
            this(0, "");
        }

        public Scientist(int id, String name) {
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return this.id;
        }

        public String getName() {
            return this.name;
        }
    }

    public class CassandraMatcher
    extends TypeSafeMatcher<PipelineResult>
    implements SerializableMatcher<PipelineResult> {
        private String tableName;
        private Cluster cluster;

        public CassandraMatcher(Cluster cluster, String tableName) {
            this.cluster = cluster;
            this.tableName = tableName;
        }

        protected boolean matchesSafely(PipelineResult pipelineResult) {
            pipelineResult.waitUntilFinish();
            Session session = this.cluster.connect();
            ResultSet result = session.execute("select id,name from BEAM." + this.tableName);
            List rows = result.all();
            if (rows.size() != 1000) {
                return false;
            }
            for (Row row : rows) {
                if (row.getString("name").matches("Name.*")) continue;
                return false;
            }
            return true;
        }

        public void describeTo(Description description) {
            description.appendText("Expected Cassandra record pattern is (Name.*)");
        }
    }
}

