package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Computed;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder;
import info.archinnov.achilles.embedded.CassandraShutDownHook;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.JMX;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.cassandra.service.StorageServiceMBean;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOTest.class */
public class CassandraIOTest implements Serializable {
    private static final long NUM_ROWS = 20;
    private static final String CASSANDRA_KEYSPACE = "beam_ks";
    private static final String CASSANDRA_HOST = "127.0.0.1";
    private static final String CASSANDRA_TABLE = "scientist";
    private static final String STORAGE_SERVICE_MBEAN = "org.apache.cassandra.db:type=StorageService";
    private static final float ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE = 0.5f;
    private static final int FLUSH_TIMEOUT = 30000;
    private static int jmxPort;
    private static int cassandraPort;
    private static Cluster cluster;
    private static Session session;

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private static CassandraShutDownHook shutdownHook;
    private static final String CASSANDRA_TABLE_WRITE = "scientist_write";
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraIOTest.class);

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static final AtomicInteger counter = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOTest$NOOPMapper.class */
    public static class NOOPMapper implements Mapper<String>, Serializable {
        private final ListeningExecutorService executor;
        final Callable<Void> asyncTask;

        private NOOPMapper() {
            this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
            this.asyncTask = () -> {
                return null;
            };
        }

        public Iterator map(ResultSet resultSet) {
            if (!resultSet.isExhausted()) {
                resultSet.iterator().forEachRemaining(row -> {
                    CassandraIOTest.counter.getAndIncrement();
                });
            }
            return Collections.emptyIterator();
        }

        public Future<Void> deleteAsync(String str) {
            CassandraIOTest.counter.incrementAndGet();
            return this.executor.submit(this.asyncTask);
        }

        public Future<Void> saveAsync(String str) {
            CassandraIOTest.counter.incrementAndGet();
            return this.executor.submit(this.asyncTask);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOTest$NOOPMapperFactory.class */
    private static class NOOPMapperFactory implements SerializableFunction<Session, Mapper> {
        private NOOPMapperFactory() {
        }

        public Mapper apply(Session session) {
            return new NOOPMapper();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Table(name = CassandraIOTest.CASSANDRA_TABLE, keyspace = CassandraIOTest.CASSANDRA_KEYSPACE)
    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOTest$Scientist.class */
    public static class Scientist implements Serializable {

        @Column(name = "person_name")
        String name;

        @Computed("writetime(person_name)")
        Long nameTs;

        @PartitionKey
        @Column(name = "person_id")
        int id;

        Scientist() {
        }

        public String toString() {
            return this.id + ":" + this.name;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Scientist scientist = (Scientist) obj;
            return this.id == scientist.id && Objects.equal(this.name, scientist.name);
        }

        public int hashCode() {
            return Objects.hashCode(new Object[]{this.name, Integer.valueOf(this.id)});
        }
    }

    @Table(name = CassandraIOTest.CASSANDRA_TABLE_WRITE, keyspace = CassandraIOTest.CASSANDRA_KEYSPACE)
    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraIOTest$ScientistWrite.class */
    static class ScientistWrite extends Scientist {
        ScientistWrite() {
        }
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        jmxPort = NetworkTestHelper.getAvailableLocalPort();
        shutdownHook = new CassandraShutDownHook();
        String path = TEMPORARY_FOLDER.newFolder(new String[]{"embedded-cassandra", "data"}).getPath();
        String path2 = TEMPORARY_FOLDER.newFolder(new String[]{"embedded-cassandra", "commit-log"}).getPath();
        String path3 = TEMPORARY_FOLDER.newFolder(new String[]{"embedded-cassandra", "cdc-raw"}).getPath();
        cluster = CassandraEmbeddedServerBuilder.builder().withKeyspaceName(CASSANDRA_KEYSPACE).withDataFolder(path).withCommitLogFolder(path2).withCdcRawFolder(path3).withHintsFolder(TEMPORARY_FOLDER.newFolder(new String[]{"embedded-cassandra", "hints"}).getPath()).withSavedCachesFolder(TEMPORARY_FOLDER.newFolder(new String[]{"embedded-cassandra", "saved-cache"}).getPath()).withShutdownHook(shutdownHook).withJMXPort(jmxPort).buildNativeCluster();
        cassandraPort = cluster.getConfiguration().getProtocolOptions().getPort();
        session = cluster.newSession();
        insertData();
    }

    @AfterClass
    public static void afterClass() throws InterruptedException {
        shutdownHook.shutDownNow();
    }

    private static void insertData() throws Exception {
        LOGGER.info("Create Cassandra tables");
        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s(person_id int, person_name text, PRIMARY KEY(person_id));", CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s(person_id int, person_name text, PRIMARY KEY(person_id));", CASSANDRA_KEYSPACE, CASSANDRA_TABLE_WRITE));
        LOGGER.info("Insert records");
        String[] strArr = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        for (int i = 0; i < NUM_ROWS; i++) {
            session.execute(String.format("INSERT INTO %s.%s(person_id, person_name) values(" + i + ", '" + strArr[i % strArr.length] + "');", CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
        }
        flushMemTables();
    }

    private static void flushMemTables() throws Exception {
        JMXConnector connect = JMXConnectorFactory.connect(new JMXServiceURL(String.format("service:jmx:rmi://%s/jndi/rmi://%s:%s/jmxrmi", CASSANDRA_HOST, CASSANDRA_HOST, Integer.valueOf(jmxPort))), (Map) null);
        ((StorageServiceMBean) JMX.newMBeanProxy(connect.getMBeanServerConnection(), new ObjectName(STORAGE_SERVICE_MBEAN), StorageServiceMBean.class)).forceKeyspaceFlush(CASSANDRA_KEYSPACE, new String[]{CASSANDRA_TABLE});
        connect.close();
        Thread.sleep(30000L);
    }

    @Test
    public void testEstimatedSizeBytes() throws Exception {
        long estimatedSizeBytes = new CassandraIO.CassandraSource(CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE), (List) null).getEstimatedSizeBytes(PipelineOptionsFactory.create());
        TestCase.assertTrue(((float) estimatedSizeBytes) >= 11664.0f && ((float) estimatedSizeBytes) <= 14256.0f);
    }

    @Test
    public void testRead() throws Exception {
        PCollection apply = this.pipeline.apply(CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withCoder(SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton(apply.apply("Count", Count.globally())).isEqualTo(Long.valueOf(NUM_ROWS));
        PAssert.that(apply.apply(MapElements.via(new SimpleFunction<Scientist, KV<String, Integer>>() { // from class: org.apache.beam.sdk.io.cassandra.CassandraIOTest.1
            public KV<String, Integer> apply(Scientist scientist) {
                return KV.of(scientist.name, Integer.valueOf(scientist.id));
            }
        })).apply("Count occurrences per scientist", Count.perKey())).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                KV kv = (KV) it.next();
                Assert.assertEquals((String) kv.getKey(), 2L, ((Long) kv.getValue()).longValue());
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithQuery() throws Exception {
        PCollection apply = this.pipeline.apply(CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withQuery("select person_id, writetime(person_name) from beam_ks.scientist where person_id=10").withCoder(SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton(apply.apply("Count", Count.globally())).isEqualTo(1L);
        PAssert.that(apply).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                Scientist scientist = (Scientist) it.next();
                Assert.assertNull(scientist.name);
                TestCase.assertTrue(scientist.nameTs != null && scientist.nameTs.longValue() > 0);
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testWrite() {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_ROWS; i++) {
            ScientistWrite scientistWrite = new ScientistWrite();
            scientistWrite.id = i;
            scientistWrite.name = "Name " + i;
            arrayList.add(scientistWrite);
        }
        this.pipeline.apply(Create.of(arrayList)).apply(CassandraIO.write().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withEntity(ScientistWrite.class));
        this.pipeline.run();
        List<Row> rows = getRows(CASSANDRA_TABLE_WRITE);
        Assert.assertEquals(NUM_ROWS, rows.size());
        Iterator<Row> it = rows.iterator();
        while (it.hasNext()) {
            TestCase.assertTrue(it.next().getString("person_name").matches("Name (\\d*)"));
        }
    }

    @Test
    public void testReadWithMapper() throws Exception {
        counter.set(0);
        this.pipeline.apply(CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withCoder(SerializableCoder.of(String.class)).withEntity(String.class).withMapperFactoryFn(new NOOPMapperFactory()));
        this.pipeline.run();
        Assert.assertEquals(NUM_ROWS, counter.intValue());
    }

    @Test
    public void testCustomMapperImplWrite() throws Exception {
        counter.set(0);
        this.pipeline.apply(Create.of("", new String[0])).apply(CassandraIO.write().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withMapperFactoryFn(new NOOPMapperFactory()).withEntity(String.class));
        this.pipeline.run();
        Assert.assertEquals(1L, counter.intValue());
    }

    @Test
    public void testCustomMapperImplDelete() {
        counter.set(0);
        this.pipeline.apply(Create.of("", new String[0])).apply(CassandraIO.delete().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withMapperFactoryFn(new NOOPMapperFactory()).withEntity(String.class));
        this.pipeline.run();
        Assert.assertEquals(1L, counter.intValue());
    }

    @Test
    public void testSplit() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        CassandraIO.CassandraSource cassandraSource = new CassandraIO.CassandraSource(CassandraIO.read().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withTable(CASSANDRA_TABLE).withEntity(Scientist.class).withCoder(SerializableCoder.of(Scientist.class)), Collections.singletonList(QueryBuilder.select().from(CASSANDRA_KEYSPACE, CASSANDRA_TABLE).toString()));
        List split = cassandraSource.split(2048, create);
        SourceTestUtils.assertSourcesEqualReferenceSource(cassandraSource, split, create);
        Assert.assertEquals("Wrong number of splits", (int) Math.ceil(((float) cassandraSource.getEstimatedSizeBytes(create)) / 2048), split.size());
        int i = 0;
        Iterator it = split.iterator();
        while (it.hasNext()) {
            if (SourceTestUtils.readFromSource((BoundedSource) it.next(), create).isEmpty()) {
                i++;
            }
        }
        Assert.assertThat("There are too many empty splits, parallelism is sub-optimal", Integer.valueOf(i), Matchers.lessThan(Integer.valueOf((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * split.size()))));
    }

    private List<Row> getRows(String str) {
        return session.execute(String.format("select person_id,person_name from %s.%s", CASSANDRA_KEYSPACE, str)).all();
    }

    @Test
    public void testDelete() throws Exception {
        Assert.assertEquals(NUM_ROWS, getRows(CASSANDRA_TABLE).size());
        Scientist scientist = new Scientist();
        scientist.id = 0;
        scientist.name = "Einstein";
        this.pipeline.apply(Create.of(scientist, new Scientist[0])).apply(CassandraIO.delete().withHosts(Collections.singletonList(CASSANDRA_HOST)).withPort(cassandraPort).withKeyspace(CASSANDRA_KEYSPACE).withEntity(Scientist.class));
        this.pipeline.run();
        Assert.assertEquals(19L, getRows(CASSANDRA_TABLE).size());
        session.execute(String.format("INSERT INTO %s.%s(person_id, person_name) values(" + scientist.id + ", '" + scientist.name + "');", CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
    }

    @Test
    public void testValidPartitioner() {
        Assert.assertTrue(CassandraIO.CassandraSource.isMurmur3Partitioner(cluster));
    }

    @Test
    public void testDistance() {
        Assert.assertEquals(BigInteger.valueOf(90L), CassandraIO.CassandraSource.distance(new BigInteger("10"), new BigInteger("100")));
        Assert.assertEquals(new BigInteger("18446744073709551526"), CassandraIO.CassandraSource.distance(new BigInteger("100"), new BigInteger("10")));
    }

    @Test
    public void testRingFraction() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new CassandraIO.CassandraSource.TokenRange(1L, 1L, BigInteger.valueOf(Long.MIN_VALUE), new BigInteger("0")));
        Assert.assertEquals(0.5d, CassandraIO.CassandraSource.getRingFraction(arrayList), 0.0d);
        arrayList.add(new CassandraIO.CassandraSource.TokenRange(1L, 1L, new BigInteger("0"), BigInteger.valueOf(Long.MAX_VALUE)));
        Assert.assertEquals(1.0d, CassandraIO.CassandraSource.getRingFraction(arrayList), 0.0d);
    }

    @Test
    public void testEstimatedSizeBytesFromTokenRanges() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new CassandraIO.CassandraSource.TokenRange(1L, 1000L, BigInteger.valueOf(Long.MIN_VALUE), BigInteger.valueOf(Long.MAX_VALUE)));
        Assert.assertEquals(1000L, CassandraIO.CassandraSource.getEstimatedSizeBytesFromTokenRanges(arrayList));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new CassandraIO.CassandraSource.TokenRange(1L, 1000L, BigInteger.valueOf(Long.MIN_VALUE), new BigInteger("0")));
        Assert.assertEquals(2000L, CassandraIO.CassandraSource.getEstimatedSizeBytesFromTokenRanges(arrayList2));
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(new CassandraIO.CassandraSource.TokenRange(1L, 1000L, BigInteger.valueOf(Long.MIN_VALUE), new BigInteger("-3")));
        arrayList3.add(new CassandraIO.CassandraSource.TokenRange(1L, 1000L, new BigInteger("-2"), new BigInteger("10000")));
        arrayList3.add(new CassandraIO.CassandraSource.TokenRange(2L, 3000L, new BigInteger("10001"), BigInteger.valueOf(Long.MAX_VALUE)));
        Assert.assertEquals(8000L, CassandraIO.CassandraSource.getEstimatedSizeBytesFromTokenRanges(arrayList3));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -568192725:
                if (implMethodName.equals("lambda$testRead$43268ee4$1")) {
                    z = true;
                    break;
                }
                break;
            case 417655827:
                if (implMethodName.equals("lambda$testReadWithQuery$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/cassandra/CassandraIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            Scientist scientist = (Scientist) it.next();
                            Assert.assertNull(scientist.name);
                            TestCase.assertTrue(scientist.nameTs != null && scientist.nameTs.longValue() > 0);
                        }
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/cassandra/CassandraIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable2 -> {
                        Iterator it = iterable2.iterator();
                        while (it.hasNext()) {
                            KV kv = (KV) it.next();
                            Assert.assertEquals((String) kv.getKey(), 2L, ((Long) kv.getValue()).longValue());
                        }
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
