/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Computed;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder;
import info.archinnov.achilles.embedded.CassandraShutDownHook;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.cassandra.Mapper;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.commons.io.FileUtils;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class CassandraIOTest
implements Serializable {
    private static final long NUM_ROWS = 20L;
    private static final String CASSANDRA_KEYSPACE = "beam_ks";
    private static final String CASSANDRA_HOST = "127.0.0.1";
    private static final String CASSANDRA_TABLE = "scientist";
    private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class);
    private static final String STORAGE_SERVICE_MBEAN = "org.apache.cassandra.db:type=StorageService";
    private static final float ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE = 0.5f;
    private static final int FLUSH_TIMEOUT = 30000;
    private static final int JMX_CONF_TIMEOUT = 1000;
    private static int jmxPort;
    private static int cassandraPort;
    private static Cluster cluster;
    private static Session session;
    private static final String TEMPORARY_FOLDER;
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private static CassandraShutDownHook shutdownHook;
    private static final AtomicInteger counter;
    private static final String CASSANDRA_TABLE_WRITE = "scientist_write";

    @BeforeClass
    public static void beforeClass() throws Exception {
        jmxPort = NetworkTestHelper.getAvailableLocalPort();
        shutdownHook = new CassandraShutDownHook();
        String data = TEMPORARY_FOLDER + "/data";
        Files.createDirectories(Paths.get(data, new String[0]), new FileAttribute[0]);
        String commitLog = TEMPORARY_FOLDER + "/commit-log";
        Files.createDirectories(Paths.get(commitLog, new String[0]), new FileAttribute[0]);
        String cdcRaw = TEMPORARY_FOLDER + "/cdc-raw";
        Files.createDirectories(Paths.get(cdcRaw, new String[0]), new FileAttribute[0]);
        String hints = TEMPORARY_FOLDER + "/hints";
        Files.createDirectories(Paths.get(hints, new String[0]), new FileAttribute[0]);
        String savedCache = TEMPORARY_FOLDER + "/saved-cache";
        Files.createDirectories(Paths.get(savedCache, new String[0]), new FileAttribute[0]);
        CassandraEmbeddedServerBuilder builder = CassandraEmbeddedServerBuilder.builder().withKeyspaceName(CASSANDRA_KEYSPACE).withDataFolder(data).withCommitLogFolder(commitLog).withCdcRawFolder(cdcRaw).withHintsFolder(hints).withSavedCachesFolder(savedCache).withShutdownHook(shutdownHook).withJMXPort(jmxPort).cleanDataFilesAtStartup(false);
        cluster = CassandraIOTest.buildCluster(builder);
        cassandraPort = cluster.getConfiguration().getProtocolOptions().getPort();
        session = cluster.newSession();
        CassandraIOTest.insertData();
        CassandraIOTest.disableAutoCompaction();
    }

    private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
        int tried = 0;
        while (tried < 3) {
            try {
                return builder.buildNativeCluster();
            }
            catch (NoHostAvailableException e) {
                ++tried;
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        throw new RuntimeException("Unable to create embedded Cassandra cluster");
    }

    @AfterClass
    public static void afterClass() throws InterruptedException, IOException {
        shutdownHook.shutDownNow();
        FileUtils.deleteDirectory((File)new File(TEMPORARY_FOLDER));
    }

    private static void insertData() throws Exception {
        LOG.info("Create Cassandra tables");
        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s(person_id int, person_name text, PRIMARY KEY(person_id));", CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s(person_id int, person_name text, PRIMARY KEY(person_id));", CASSANDRA_KEYSPACE, CASSANDRA_TABLE_WRITE));
        LOG.info("Insert records");
        String[] scientists = new String[]{"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        int i = 0;
        while ((long)i < 20L) {
            int index = i % scientists.length;
            session.execute(String.format("INSERT INTO %s.%s(person_id, person_name) values(" + i + ", '" + scientists[index] + "');", CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
            ++i;
        }
        CassandraIOTest.flushMemTables();
    }

    private static void flushMemTables() throws Exception {
        JMXServiceURL url = new JMXServiceURL(String.format("service:jmx:rmi://%s/jndi/rmi://%s:%s/jmxrmi", CASSANDRA_HOST, CASSANDRA_HOST, jmxPort));
        JMXConnector jmxConnector = JMXConnectorFactory.connect(url, null);
        MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
        ObjectName objectName = new ObjectName(STORAGE_SERVICE_MBEAN);
        StorageServiceMBean mBeanProxy = JMX.newMBeanProxy(mBeanServerConnection, objectName, StorageServiceMBean.class);
        mBeanProxy.forceKeyspaceFlush(CASSANDRA_KEYSPACE, new String[]{CASSANDRA_TABLE});
        jmxConnector.close();
        Thread.sleep(30000L);
    }

    private static void disableAutoCompaction() throws Exception {
        JMXServiceURL url = new JMXServiceURL(String.format("service:jmx:rmi://%s/jndi/rmi://%s:%s/jmxrmi", CASSANDRA_HOST, CASSANDRA_HOST, jmxPort));
        JMXConnector jmxConnector = JMXConnectorFactory.connect(url, null);
        MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
        ObjectName objectName = new ObjectName(STORAGE_SERVICE_MBEAN);
        StorageServiceMBean mBeanProxy = JMX.newMBeanProxy(mBeanServerConnection, objectName, StorageServiceMBean.class);
        mBeanProxy.disableAutoCompaction(CASSANDRA_KEYSPACE, new String[]{CASSANDRA_TABLE});
        jmxConnector.close();
        Thread.sleep(1000L);
    }

    @Test
    public void testEstimatedSizeBytes() throws Exception {
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        CassandraIO.Read read = CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE);
        CassandraIO.CassandraSource source = new CassandraIO.CassandraSource(read, null);
        long estimatedSizeBytes = source.getEstimatedSizeBytes(pipelineOptions);
        TestCase.assertTrue(((float)estimatedSizeBytes >= 11664.0f && (float)estimatedSizeBytes <= 14256.0f ? 1 : 0) != 0);
    }

    @Test
    public void testRead() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)20L);
        PCollection mapped = (PCollection)output.apply((PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Scientist, KV<String, Integer>>(){

            public KV<String, Integer> apply(Scientist scientist) {
                return KV.of((Object)scientist.name, (Object)scientist.id);
            }
        }));
        PAssert.that((PCollection)((PCollection)mapped.apply("Count occurrences per scientist", Count.perKey()))).satisfies((SerializableFunction & Serializable)input -> {
            for (KV element : input) {
                Assert.assertEquals((String)((String)element.getKey()), (long)2L, (long)((Long)element.getValue()));
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithQuery() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withQuery("select person_id, writetime(person_name) from beam_ks.scientist where person_id=10").withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)1L);
        PAssert.that((PCollection)output).satisfies((SerializableFunction & Serializable)input -> {
            for (Scientist sci : input) {
                Assert.assertNull((Object)sci.name);
                TestCase.assertTrue((sci.nameTs != null && sci.nameTs > 0L ? 1 : 0) != 0);
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testWrite() {
        ArrayList<ScientistWrite> data = new ArrayList<ScientistWrite>();
        int i = 0;
        while ((long)i < 20L) {
            ScientistWrite scientist = new ScientistWrite();
            scientist.id = i;
            scientist.name = "Name " + i;
            data.add(scientist);
            ++i;
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)CassandraIO.write().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withEntity(ScientistWrite.class));
        this.pipeline.run();
        List<Row> results = this.getRows(CASSANDRA_TABLE_WRITE);
        Assert.assertEquals((long)20L, (long)results.size());
        for (Row row : results) {
            TestCase.assertTrue((boolean)row.getString("person_name").matches("Name (\\d*)"));
        }
    }

    @Test
    public void testReadWithMapper() throws Exception {
        counter.set(0);
        NOOPMapperFactory factory = new NOOPMapperFactory();
        this.pipeline.apply((PTransform)CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withCoder((Coder)SerializableCoder.of(String.class)).withEntity(String.class).withMapperFactoryFn((SerializableFunction)factory));
        this.pipeline.run();
        Assert.assertEquals((long)20L, (long)counter.intValue());
    }

    @Test
    public void testCustomMapperImplWrite() throws Exception {
        counter.set(0);
        NOOPMapperFactory factory = new NOOPMapperFactory();
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)"", (Object[])new String[0]))).apply((PTransform)CassandraIO.write().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withMapperFactoryFn((SerializableFunction)factory).withEntity(String.class));
        this.pipeline.run();
        Assert.assertEquals((long)1L, (long)counter.intValue());
    }

    @Test
    public void testCustomMapperImplDelete() {
        counter.set(0);
        NOOPMapperFactory factory = new NOOPMapperFactory();
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)"", (Object[])new String[0]))).apply((PTransform)CassandraIO.delete().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withMapperFactoryFn((SerializableFunction)factory).withEntity(String.class));
        this.pipeline.run();
        Assert.assertEquals((long)1L, (long)counter.intValue());
    }

    @Test
    public void testSplit() throws Exception {
        PipelineOptions options = PipelineOptionsFactory.create();
        CassandraIO.Read read = CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withEntity(Scientist.class).withCoder((Coder)SerializableCoder.of(Scientist.class));
        String splitQuery = QueryBuilder.select().from(CASSANDRA_KEYSPACE, CASSANDRA_TABLE).toString();
        CassandraIO.CassandraSource initialSource = new CassandraIO.CassandraSource(read, Collections.singletonList(splitQuery));
        int desiredBundleSizeBytes = 2048;
        long estimatedSize = initialSource.getEstimatedSizeBytes(options);
        List splits = initialSource.split((long)desiredBundleSizeBytes, options);
        SourceTestUtils.assertSourcesEqualReferenceSource((BoundedSource)initialSource, (List)splits, (PipelineOptions)options);
        float expectedNumSplitsloat = (float)initialSource.getEstimatedSizeBytes(options) / (float)desiredBundleSizeBytes;
        long sum = 0L;
        for (BoundedSource subSource : splits) {
            sum += subSource.getEstimatedSizeBytes(options);
        }
        Assert.assertEquals((long)(estimatedSize / (long)splits.size() * (long)splits.size()), (long)sum);
        int expectedNumSplits = (int)Math.ceil(expectedNumSplitsloat);
        Assert.assertEquals((String)"Wrong number of splits", (long)expectedNumSplits, (long)splits.size());
        int emptySplits = 0;
        for (BoundedSource subSource : splits) {
            if (!SourceTestUtils.readFromSource((BoundedSource)subSource, (PipelineOptions)options).isEmpty()) continue;
            ++emptySplits;
        }
        Assert.assertThat((String)"There are too many empty splits, parallelism is sub-optimal", (Object)emptySplits, (Matcher)Matchers.lessThan((Comparable)Integer.valueOf((int)(0.5f * (float)splits.size()))));
    }

    private List<Row> getRows(String table) {
        ResultSet result = session.execute(String.format("select person_id,person_name from %s.%s", CASSANDRA_KEYSPACE, table));
        return result.all();
    }

    @Test
    public void testDelete() throws Exception {
        List<Row> results = this.getRows(CASSANDRA_TABLE);
        Assert.assertEquals((long)20L, (long)results.size());
        Scientist einstein = new Scientist();
        einstein.id = 0;
        einstein.name = "Einstein";
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)einstein, (Object[])new Scientist[0]))).apply((PTransform)CassandraIO.delete().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withEntity(Scientist.class));
        this.pipeline.run();
        results = this.getRows(CASSANDRA_TABLE);
        Assert.assertEquals((long)19L, (long)results.size());
        session.execute(String.format("INSERT INTO %s.%s(person_id, person_name) values(" + einstein.id + ", '" + einstein.name + "');", CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
    }

    @Test
    public void testValidPartitioner() {
        Assert.assertTrue((boolean)CassandraIO.CassandraSource.isMurmur3Partitioner((Cluster)cluster));
    }

    @Test
    public void testDistance() {
        BigInteger distance = CassandraIO.CassandraSource.distance((BigInteger)new BigInteger("10"), (BigInteger)new BigInteger("100"));
        Assert.assertEquals((Object)BigInteger.valueOf(90L), (Object)distance);
        distance = CassandraIO.CassandraSource.distance((BigInteger)new BigInteger("100"), (BigInteger)new BigInteger("10"));
        Assert.assertEquals((Object)new BigInteger("18446744073709551526"), (Object)distance);
    }

    @Test
    public void testRingFraction() {
        ArrayList<CassandraIO.CassandraSource.TokenRange> tokenRanges = new ArrayList<CassandraIO.CassandraSource.TokenRange>();
        tokenRanges.add(new CassandraIO.CassandraSource.TokenRange(1L, 1L, BigInteger.valueOf(Long.MIN_VALUE), new BigInteger("0")));
        Assert.assertEquals((double)0.5, (double)CassandraIO.CassandraSource.getRingFraction(tokenRanges), (double)0.0);
        tokenRanges.add(new CassandraIO.CassandraSource.TokenRange(1L, 1L, new BigInteger("0"), BigInteger.valueOf(Long.MAX_VALUE)));
        Assert.assertEquals((double)1.0, (double)CassandraIO.CassandraSource.getRingFraction(tokenRanges), (double)0.0);
    }

    @Test
    public void testEstimatedSizeBytesFromTokenRanges() {
        ArrayList<CassandraIO.CassandraSource.TokenRange> tokenRanges = new ArrayList<CassandraIO.CassandraSource.TokenRange>();
        tokenRanges.add(new CassandraIO.CassandraSource.TokenRange(1L, 1000L, BigInteger.valueOf(Long.MIN_VALUE), BigInteger.valueOf(Long.MAX_VALUE)));
        Assert.assertEquals((long)1000L, (long)CassandraIO.CassandraSource.getEstimatedSizeBytesFromTokenRanges(tokenRanges));
        tokenRanges = new ArrayList();
        tokenRanges.add(new CassandraIO.CassandraSource.TokenRange(1L, 1000L, BigInteger.valueOf(Long.MIN_VALUE), new BigInteger("0")));
        Assert.assertEquals((long)2000L, (long)CassandraIO.CassandraSource.getEstimatedSizeBytesFromTokenRanges(tokenRanges));
        tokenRanges = new ArrayList();
        tokenRanges.add(new CassandraIO.CassandraSource.TokenRange(1L, 1000L, BigInteger.valueOf(Long.MIN_VALUE), new BigInteger("-3")));
        tokenRanges.add(new CassandraIO.CassandraSource.TokenRange(1L, 1000L, new BigInteger("-2"), new BigInteger("10000")));
        tokenRanges.add(new CassandraIO.CassandraSource.TokenRange(2L, 3000L, new BigInteger("10001"), BigInteger.valueOf(Long.MAX_VALUE)));
        Assert.assertEquals((long)8000L, (long)CassandraIO.CassandraSource.getEstimatedSizeBytesFromTokenRanges(tokenRanges));
    }

    static {
        TEMPORARY_FOLDER = System.getProperty("java.io.tmpdir") + "/embedded-cassandra/";
        counter = new AtomicInteger();
    }

    @Table(name="scientist_write", keyspace="beam_ks")
    static class ScientistWrite
    extends Scientist {
        ScientistWrite() {
        }
    }

    @Table(name="scientist", keyspace="beam_ks")
    static class Scientist
    implements Serializable {
        @Column(name="person_name")
        String name;
        @Computed(value="writetime(person_name)")
        Long nameTs;
        @PartitionKey
        @Column(name="person_id")
        int id;

        Scientist() {
        }

        public String toString() {
            return this.id + ":" + this.name;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Scientist scientist = (Scientist)o;
            return this.id == scientist.id && Objects.equal((Object)this.name, (Object)scientist.name);
        }

        public int hashCode() {
            return Objects.hashCode((Object[])new Object[]{this.name, this.id});
        }
    }

    private static class NOOPMapper
    implements Mapper<String>,
    Serializable {
        private final ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(10));
        final Callable<Void> asyncTask = () -> null;

        private NOOPMapper() {
        }

        public Iterator map(ResultSet resultSet) {
            if (!resultSet.isExhausted()) {
                resultSet.iterator().forEachRemaining(r -> counter.getAndIncrement());
            }
            return Collections.emptyIterator();
        }

        public Future<Void> deleteAsync(String entity) {
            counter.incrementAndGet();
            return this.executor.submit(this.asyncTask);
        }

        public Future<Void> saveAsync(String entity) {
            counter.incrementAndGet();
            return this.executor.submit(this.asyncTask);
        }
    }

    private static class NOOPMapperFactory
    implements SerializableFunction<Session, Mapper> {
        private NOOPMapperFactory() {
        }

        public Mapper apply(Session input) {
            return new NOOPMapper();
        }
    }
}

