/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.AuthProvider;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.cassandra.CassandraService;
import org.apache.beam.sdks.java.io.cassandra.repackaged.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraServiceImpl<T>
implements CassandraService<T> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImpl.class);
    private static final long MIN_TOKEN = Long.MIN_VALUE;
    private static final long MAX_TOKEN = Long.MAX_VALUE;
    private static final BigInteger TOTAL_TOKEN_COUNT = BigInteger.valueOf(Long.MAX_VALUE).subtract(BigInteger.valueOf(Long.MIN_VALUE));

    @Override
    public CassandraReaderImpl<T> createReader(CassandraIO.CassandraSource<T> source) {
        return new CassandraReaderImpl<T>(source);
    }

    /*
     * Loose catch block
     */
    @Override
    public long getEstimatedSizeBytes(CassandraIO.Read<T> spec) {
        Throwable throwable = null;
        try (Cluster cluster = this.getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), spec.localDc(), spec.consistencyLevel());){
            block22: {
                if (!CassandraServiceImpl.isMurmur3Partitioner(cluster)) break block22;
                try {
                    List<TokenRange> tokenRanges = CassandraServiceImpl.getTokenRanges(cluster, spec.keyspace(), spec.table());
                    long l = CassandraServiceImpl.getEstimatedSizeBytes(tokenRanges);
                    return l;
                }
                catch (Exception e) {
                    long l;
                    block20: {
                        block21: {
                            LOG.warn("Can't estimate the size", (Throwable)e);
                            l = 0L;
                            if (cluster == null) break block20;
                            if (throwable == null) break block21;
                            try {
                                cluster.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                            break block20;
                        }
                        cluster.close();
                    }
                    return l;
                }
            }
            LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size");
            long l = 0L;
            return l;
            {
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
                catch (Throwable throwable4) {
                    throw throwable4;
                }
            }
        }
    }

    @VisibleForTesting
    protected static long getEstimatedSizeBytes(List<TokenRange> tokenRanges) {
        long size = 0L;
        for (TokenRange tokenRange : tokenRanges) {
            size += tokenRange.meanPartitionSize * tokenRange.partitionCount;
        }
        return Math.round((double)size / CassandraServiceImpl.getRingFraction(tokenRanges));
    }

    @Override
    public List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes) {
        try (Cluster cluster = this.getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), spec.localDc(), spec.consistencyLevel());){
            if (CassandraServiceImpl.isMurmur3Partitioner(cluster)) {
                LOG.info("Murmur3Partitioner detected, splitting");
                List<BoundedSource<T>> list = this.split(spec, desiredBundleSizeBytes, this.getEstimatedSizeBytes(spec));
                return list;
            }
            LOG.warn("Only Murmur3Partitioner is supported for splitting, using an unique source for the read");
            String splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
            ArrayList<BoundedSource<T>> sources = new ArrayList<BoundedSource<T>>();
            sources.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
            ArrayList<BoundedSource<T>> arrayList = sources;
            return arrayList;
        }
    }

    @VisibleForTesting
    protected List<BoundedSource<T>> split(CassandraIO.Read<T> spec, long desiredBundleSizeBytes, long estimatedSizeBytes) {
        long numSplits = 1L;
        ArrayList<BoundedSource<T>> sourceList = new ArrayList<BoundedSource<T>>();
        if (desiredBundleSizeBytes > 0L) {
            numSplits = estimatedSizeBytes / desiredBundleSizeBytes;
        }
        if (numSplits <= 0L) {
            LOG.warn("Number of splits is less than 0 ({}), fallback to 1", (Object)numSplits);
            numSplits = 1L;
        }
        LOG.info("Number of splits is {}", (Object)numSplits);
        double startRange = -9.223372036854776E18;
        double endRange = 9.223372036854776E18;
        double endToken = startRange;
        double incrementValue = endRange - startRange / (double)numSplits;
        if (numSplits == 1L) {
            String splitQuery = QueryBuilder.select().from(spec.keyspace(), spec.table()).toString();
            sourceList.add(new CassandraIO.CassandraSource<T>(spec, splitQuery));
        } else {
            int i = 0;
            while ((long)i < numSplits) {
                double startToken = endToken;
                endToken = (long)i == numSplits ? endRange : startToken + incrementValue;
                Select.Where builder = QueryBuilder.select().from(spec.keyspace(), spec.table()).where();
                if (i > 0) {
                    builder = builder.and(QueryBuilder.gte((String)"token($pk)", (Object)startToken));
                }
                if ((long)i < numSplits - 1L) {
                    builder = builder.and(QueryBuilder.lt((String)"token($pk)", (Object)endToken));
                }
                sourceList.add(new CassandraIO.CassandraSource<T>(spec, builder.toString()));
                ++i;
            }
        }
        return sourceList;
    }

    private Cluster getCluster(List<String> hosts, int port, String username, String password, String localDc, String consistencyLevel) {
        Cluster.Builder builder = Cluster.builder().addContactPoints(hosts.toArray(new String[0])).withPort(port);
        if (username != null) {
            builder.withAuthProvider((AuthProvider)new PlainTextAuthProvider(username, password));
        }
        if (localDc != null) {
            builder.withLoadBalancingPolicy((LoadBalancingPolicy)new TokenAwarePolicy((LoadBalancingPolicy)new DCAwareRoundRobinPolicy.Builder().withLocalDc(localDc).build()));
        } else {
            builder.withLoadBalancingPolicy((LoadBalancingPolicy)new TokenAwarePolicy((LoadBalancingPolicy)new RoundRobinPolicy()));
        }
        if (consistencyLevel != null) {
            builder.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf((String)consistencyLevel)));
        }
        return builder.build();
    }

    private static List<TokenRange> getTokenRanges(Cluster cluster, String keyspace, String table) {
        try (Session session = cluster.newSession();){
            ResultSet resultSet = session.execute("SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ?", new Object[]{keyspace, table});
            ArrayList<TokenRange> tokenRanges = new ArrayList<TokenRange>();
            for (Row row : resultSet) {
                TokenRange tokenRange = new TokenRange(row.getLong("partitions_count"), row.getLong("mean_partition_size"), row.getLong("range_start"), row.getLong("range_end"));
                tokenRanges.add(tokenRange);
            }
            ArrayList<TokenRange> arrayList = tokenRanges;
            return arrayList;
        }
    }

    @VisibleForTesting
    protected static double getRingFraction(List<TokenRange> tokenRanges) {
        double ringFraction = 0.0;
        for (TokenRange tokenRange : tokenRanges) {
            ringFraction += CassandraServiceImpl.distance(tokenRange.rangeStart, tokenRange.rangeEnd).doubleValue() / TOTAL_TOKEN_COUNT.doubleValue();
        }
        return ringFraction;
    }

    @VisibleForTesting
    protected static BigInteger distance(long left, long right) {
        if (right > left) {
            return BigInteger.valueOf(right).subtract(BigInteger.valueOf(left));
        }
        return BigInteger.valueOf(right).subtract(BigInteger.valueOf(left)).add(TOTAL_TOKEN_COUNT);
    }

    @VisibleForTesting
    protected static boolean isMurmur3Partitioner(Cluster cluster) {
        return cluster.getMetadata().getPartitioner().equals("org.apache.cassandra.dht.Murmur3Partitioner");
    }

    @Override
    public CassandraService.Writer createWriter(CassandraIO.Write<T> spec) {
        return new WriterImpl<T>(spec);
    }

    protected class WriterImpl<T>
    implements CassandraService.Writer<T> {
        private final CassandraIO.Write<T> spec;
        private final Cluster cluster;
        private final Session session;
        private final MappingManager mappingManager;

        public WriterImpl(CassandraIO.Write<T> spec) {
            this.spec = spec;
            this.cluster = CassandraServiceImpl.this.getCluster(spec.hosts(), spec.port(), spec.username(), spec.password(), spec.localDc(), spec.consistencyLevel());
            this.session = this.cluster.connect(spec.keyspace());
            this.mappingManager = new MappingManager(this.session);
        }

        @Override
        public void write(T entity) {
            Mapper mapper = this.mappingManager.mapper(entity.getClass());
            mapper.save(entity);
        }

        @Override
        public void close() {
            if (this.session != null) {
                this.session.close();
            }
            if (this.cluster != null) {
                this.cluster.close();
            }
        }
    }

    @VisibleForTesting
    protected static class TokenRange {
        private final long partitionCount;
        private final long meanPartitionSize;
        private final long rangeStart;
        private final long rangeEnd;

        public TokenRange(long partitionCount, long meanPartitionSize, long rangeStart, long rangeEnd) {
            this.partitionCount = partitionCount;
            this.meanPartitionSize = meanPartitionSize;
            this.rangeStart = rangeStart;
            this.rangeEnd = rangeEnd;
        }
    }

    private class CassandraReaderImpl<T>
    extends BoundedSource.BoundedReader<T> {
        private final CassandraIO.CassandraSource<T> source;
        private Cluster cluster;
        private Session session;
        private ResultSet resultSet;
        private Iterator<T> iterator;
        private T current;

        public CassandraReaderImpl(CassandraIO.CassandraSource<T> source) {
            this.source = source;
        }

        public boolean start() throws IOException {
            LOG.debug("Starting Cassandra reader");
            this.cluster = CassandraServiceImpl.this.getCluster(this.source.spec.hosts(), this.source.spec.port(), this.source.spec.username(), this.source.spec.password(), this.source.spec.localDc(), this.source.spec.consistencyLevel());
            this.session = this.cluster.connect();
            LOG.debug("Query: " + this.source.splitQuery);
            this.resultSet = this.session.execute(this.source.splitQuery);
            MappingManager mappingManager = new MappingManager(this.session);
            Mapper mapper = mappingManager.mapper(this.source.spec.entity());
            this.iterator = mapper.map(this.resultSet).iterator();
            return this.advance();
        }

        public boolean advance() throws IOException {
            if (this.iterator.hasNext()) {
                this.current = this.iterator.next();
                return true;
            }
            this.current = null;
            return false;
        }

        public void close() {
            LOG.debug("Closing Cassandra reader");
            if (this.session != null) {
                this.session.close();
            }
            if (this.cluster != null) {
                this.cluster.close();
            }
        }

        public T getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        public CassandraIO.CassandraSource<T> getCurrentSource() {
            return this.source;
        }
    }
}

