/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import junit.framework.TestCase;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.cassandra.CassandraService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraIOTest
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class);
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Test
    public void testEstimatedSizeBytes() throws Exception {
        FakeCassandraService service = new FakeCassandraService();
        service.load();
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        CassandraIO.Read spec = CassandraIO.read().withCassandraService((CassandraService)service);
        CassandraIO.CassandraSource source = new CassandraIO.CassandraSource(spec, null);
        long estimatedSizeBytes = source.getEstimatedSizeBytes(pipelineOptions);
        Assert.assertEquals((long)113890L, (long)estimatedSizeBytes);
    }

    @Test
    public void testRead() throws Exception {
        FakeCassandraService service = new FakeCassandraService();
        service.load();
        PCollection output = (PCollection)this.pipeline.apply((PTransform)CassandraIO.read().withCassandraService((CassandraService)service).withKeyspace("beam").withTable("scientist").withCoder((Coder)SerializableCoder.of(Scientist.class)).withEntity(Scientist.class));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)10000L);
        PCollection mapped = (PCollection)output.apply((PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Scientist, KV<String, Integer>>(){

            public KV<String, Integer> apply(Scientist scientist) {
                return KV.of((Object)scientist.name, (Object)scientist.id);
            }
        }));
        PAssert.that((PCollection)((PCollection)mapped.apply("Count occurrences per scientist", Count.perKey()))).satisfies((SerializableFunction & Serializable)input -> {
            for (KV element : input) {
                Assert.assertEquals((String)((String)element.getKey()), (long)1000L, (long)((Long)element.getValue()));
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws Exception {
        FakeCassandraService service = new FakeCassandraService();
        ArrayList<Scientist> data = new ArrayList<Scientist>();
        for (int i = 0; i < 1000; ++i) {
            Scientist scientist = new Scientist();
            scientist.id = i;
            scientist.name = "Name " + i;
            data.add(scientist);
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)CassandraIO.write().withCassandraService((CassandraService)service).withKeyspace("beam").withEntity(Scientist.class));
        this.pipeline.run();
        Assert.assertEquals((long)service.getTable().size(), (long)1000L);
        for (Scientist scientist : service.getTable().values()) {
            TestCase.assertTrue((boolean)scientist.name.matches("Name (\\d*)"));
        }
    }

    @Table(name="scientist", keyspace="beam")
    public static class Scientist
    implements Serializable {
        @Column(name="person_name")
        public String name;
        @Column(name="person_id")
        public int id;

        public String toString() {
            return this.id + ":" + this.name;
        }
    }

    private static class FakeCassandraService
    implements CassandraService<Scientist> {
        private static final Map<Integer, Scientist> table = new ConcurrentHashMap<Integer, Scientist>();

        private FakeCassandraService() {
        }

        public void load() {
            table.clear();
            String[] scientists = new String[]{"Lovelace", "Franklin", "Meitner", "Hopper", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
            int i = 0;
            while (i < 10000) {
                int index = i % scientists.length;
                Scientist scientist = new Scientist();
                scientist.id = i++;
                scientist.name = scientists[index];
                table.put(scientist.id, scientist);
            }
        }

        public Map<Integer, Scientist> getTable() {
            return table;
        }

        public FakeCassandraReader createReader(CassandraIO.CassandraSource source) {
            return new FakeCassandraReader(source);
        }

        public long getEstimatedSizeBytes(CassandraIO.Read spec) {
            long size = 0L;
            for (Scientist scientist : table.values()) {
                size += (long)scientist.toString().getBytes().length;
            }
            return size;
        }

        public List<BoundedSource<Scientist>> split(CassandraIO.Read spec, long desiredBundleSizeBytes) {
            ArrayList<BoundedSource<Scientist>> sources = new ArrayList<BoundedSource<Scientist>>();
            sources.add((BoundedSource<Scientist>)new CassandraIO.CassandraSource(spec, null));
            return sources;
        }

        public FakeCassandraWriter createWriter(CassandraIO.Write<Scientist> spec) {
            return new FakeCassandraWriter();
        }

        static class FakeCassandraWriter
        implements CassandraService.Writer<Scientist> {
            FakeCassandraWriter() {
            }

            public void write(Scientist scientist) {
                table.put(scientist.id, scientist);
            }

            public void close() {
            }
        }

        static class FakeCassandraReader
        extends BoundedSource.BoundedReader {
            private final CassandraIO.CassandraSource source;
            private Iterator<Scientist> iterator;
            private Scientist current;

            public FakeCassandraReader(CassandraIO.CassandraSource source) {
                this.source = source;
            }

            public boolean start() throws IOException {
                this.iterator = table.values().iterator();
                return this.advance();
            }

            public boolean advance() throws IOException {
                if (this.iterator.hasNext()) {
                    this.current = this.iterator.next();
                    return true;
                }
                this.current = null;
                return false;
            }

            public void close() {
                this.iterator = null;
                this.current = null;
            }

            public Scientist getCurrent() throws NoSuchElementException {
                if (this.current == null) {
                    throw new NoSuchElementException();
                }
                return this.current;
            }

            public CassandraIO.CassandraSource getCurrentSource() {
                return this.source;
            }
        }
    }
}

