package org.apache.beam.sdk.io.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.QueryOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_io_cassandra.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.cassandra.CassandraIO;
import org.apache.beam.sdk.io.cassandra.CassandraService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraServiceImpl.class */
public class CassandraServiceImpl<T> implements CassandraService<T> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraServiceImpl.class);
    private static final String MURMUR3PARTITIONER = "org.apache.cassandra.dht.Murmur3Partitioner";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraServiceImpl$CassandraReaderImpl.class */
    public class CassandraReaderImpl extends BoundedSource.BoundedReader<T> {
        private final CassandraIO.CassandraSource<T> source;
        private Cluster cluster;
        private Session session;
        private Iterator<T> iterator;
        private T current;

        CassandraReaderImpl(CassandraIO.CassandraSource<T> cassandraSource) {
            this.source = cassandraSource;
        }

        public boolean start() {
            CassandraServiceImpl.LOG.debug("Starting Cassandra reader");
            this.cluster = CassandraServiceImpl.this.getCluster(this.source.spec.hosts(), this.source.spec.port().intValue(), this.source.spec.username(), this.source.spec.password(), this.source.spec.localDc(), this.source.spec.consistencyLevel());
            this.session = this.cluster.connect(this.source.spec.keyspace());
            CassandraServiceImpl.LOG.debug("Queries: " + this.source.splitQueries);
            ArrayList<ResultSetFuture> arrayList = new ArrayList();
            Iterator<String> it = this.source.splitQueries.iterator();
            while (it.hasNext()) {
                arrayList.add(this.session.executeAsync(it.next()));
            }
            Mapper mapper = new MappingManager(this.session).mapper(this.source.spec.entity());
            for (ResultSetFuture resultSetFuture : arrayList) {
                if (this.iterator == null) {
                    this.iterator = mapper.map(resultSetFuture.getUninterruptibly()).iterator();
                } else {
                    this.iterator = Iterators.concat(this.iterator, mapper.map(resultSetFuture.getUninterruptibly()).iterator());
                }
            }
            return advance();
        }

        public boolean advance() {
            if (this.iterator.hasNext()) {
                this.current = this.iterator.next();
                return true;
            }
            this.current = null;
            return false;
        }

        public void close() {
            CassandraServiceImpl.LOG.debug("Closing Cassandra reader");
            if (this.session != null) {
                this.session.close();
            }
            if (this.cluster != null) {
                this.cluster.close();
            }
        }

        public T getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current;
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public CassandraIO.CassandraSource<T> m246getCurrentSource() {
            return this.source;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraServiceImpl$DeleterImpl.class */
    protected class DeleterImpl extends CassandraServiceImpl<T>.MutatorImpl implements CassandraService.Deleter<T> {
        DeleterImpl(CassandraIO.Mutate<T> mutate) {
            super(mutate, (v0, v1) -> {
                return v0.deleteAsync(v1);
            }, "deletes");
        }

        @Override // org.apache.beam.sdk.io.cassandra.CassandraService.Deleter
        public void delete(T t) throws ExecutionException, InterruptedException {
            mutate(t);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraServiceImpl$MutatorImpl.class */
    abstract class MutatorImpl {
        private static final int CONCURRENT_ASYNC_QUERIES = 100;
        private final CassandraIO.Mutate<T> spec;
        private final Cluster cluster;
        private final Session session;
        private final MappingManager mappingManager;
        private List<ListenableFuture<Void>> mutateFutures = new ArrayList();
        private final BiFunction<Mapper<T>, T, ListenableFuture<Void>> mutator;
        private final String operationName;

        MutatorImpl(CassandraIO.Mutate<T> mutate, BiFunction<Mapper<T>, T, ListenableFuture<Void>> biFunction, String str) {
            this.spec = mutate;
            this.cluster = CassandraServiceImpl.this.getCluster(mutate.hosts(), mutate.port().intValue(), mutate.username(), mutate.password(), mutate.localDc(), mutate.consistencyLevel());
            this.session = this.cluster.connect(mutate.keyspace());
            this.mappingManager = new MappingManager(this.session);
            this.mutator = biFunction;
            this.operationName = str;
        }

        void mutate(T t) throws ExecutionException, InterruptedException {
            this.mutateFutures.add(this.mutator.apply(this.mappingManager.mapper(t.getClass()), t));
            if (this.mutateFutures.size() == CONCURRENT_ASYNC_QUERIES) {
                CassandraServiceImpl.LOG.debug("Waiting for a batch of {} Cassandra {} to be executed...", Integer.valueOf(CONCURRENT_ASYNC_QUERIES), this.operationName);
                waitForFuturesToFinish();
                this.mutateFutures = new ArrayList();
            }
        }

        public void close() throws ExecutionException, InterruptedException {
            if (this.mutateFutures.size() > 0) {
                waitForFuturesToFinish();
            }
            if (this.session != null) {
                this.session.close();
            }
            if (this.cluster != null) {
                this.cluster.close();
            }
        }

        private void waitForFuturesToFinish() throws ExecutionException, InterruptedException {
            Iterator<ListenableFuture<Void>> it = this.mutateFutures.iterator();
            while (it.hasNext()) {
                it.next().get();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraServiceImpl$TokenRange.class */
    public static class TokenRange {
        private final long partitionCount;
        private final long meanPartitionSize;
        private final BigInteger rangeStart;
        private final BigInteger rangeEnd;

        public TokenRange(long j, long j2, BigInteger bigInteger, BigInteger bigInteger2) {
            this.partitionCount = j;
            this.meanPartitionSize = j2;
            this.rangeStart = bigInteger;
            this.rangeEnd = bigInteger2;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/cassandra/CassandraServiceImpl$WriterImpl.class */
    class WriterImpl extends CassandraServiceImpl<T>.MutatorImpl implements CassandraService.Writer<T> {
        WriterImpl(CassandraIO.Mutate<T> mutate) {
            super(mutate, (v0, v1) -> {
                return v0.saveAsync(v1);
            }, "writes");
        }

        @Override // org.apache.beam.sdk.io.cassandra.CassandraService.Writer
        public void write(T t) throws ExecutionException, InterruptedException {
            mutate(t);
        }
    }

    @Override // org.apache.beam.sdk.io.cassandra.CassandraService
    public CassandraServiceImpl<T>.CassandraReaderImpl createReader(CassandraIO.CassandraSource<T> cassandraSource) {
        return new CassandraReaderImpl(cassandraSource);
    }

    @Override // org.apache.beam.sdk.io.cassandra.CassandraService
    public long getEstimatedSizeBytes(CassandraIO.Read<T> read) {
        Cluster cluster = getCluster(read.hosts(), read.port().intValue(), read.username(), read.password(), read.localDc(), read.consistencyLevel());
        try {
            if (!isMurmur3Partitioner(cluster)) {
                LOG.warn("Only Murmur3 partitioner is supported, can't estimate the size");
                if (cluster != null) {
                    $closeResource(null, cluster);
                }
                return 0L;
            }
            try {
                long estimatedSizeBytes = getEstimatedSizeBytes(getTokenRanges(cluster, read.keyspace(), read.table()));
                if (cluster != null) {
                    $closeResource(null, cluster);
                }
                return estimatedSizeBytes;
            } catch (Exception e) {
                LOG.warn("Can't estimate the size", e);
                if (cluster != null) {
                    $closeResource(null, cluster);
                }
                return 0L;
            }
        } catch (Throwable th) {
            if (cluster != null) {
                $closeResource(null, cluster);
            }
            throw th;
        }
    }

    @VisibleForTesting
    static long getEstimatedSizeBytes(List<TokenRange> list) {
        long j = 0;
        for (TokenRange tokenRange : list) {
            j += tokenRange.meanPartitionSize * tokenRange.partitionCount;
        }
        return Math.round(j / getRingFraction(list));
    }

    @Override // org.apache.beam.sdk.io.cassandra.CassandraService
    public List<BoundedSource<T>> split(CassandraIO.Read<T> read, long j) {
        Cluster cluster = getCluster(read.hosts(), read.port().intValue(), read.username(), read.password(), read.localDc(), read.consistencyLevel());
        try {
            if (isMurmur3Partitioner(cluster)) {
                LOG.info("Murmur3Partitioner detected, splitting");
                List<BoundedSource<T>> split = split(read, j, getEstimatedSizeBytes(read), cluster);
                if (cluster != null) {
                    $closeResource(null, cluster);
                }
                return split;
            }
            LOG.warn("Only Murmur3Partitioner is supported for splitting, using an unique source for the read");
            List<BoundedSource<T>> singletonList = Collections.singletonList(new CassandraIO.CassandraSource(read, Collections.singletonList(QueryBuilder.select().from(read.keyspace(), read.table()).toString())));
            if (cluster != null) {
                $closeResource(null, cluster);
            }
            return singletonList;
        } catch (Throwable th) {
            if (cluster != null) {
                $closeResource(null, cluster);
            }
            throw th;
        }
    }

    private List<BoundedSource<T>> split(CassandraIO.Read<T> read, long j, long j2, Cluster cluster) {
        long numSplits = getNumSplits(j, j2, read.minNumberOfSplits());
        LOG.info("Number of desired splits is {}", Long.valueOf(numSplits));
        List<List<RingRange>> generateSplits = new SplitGenerator(cluster.getMetadata().getPartitioner()).generateSplits(numSplits, (List) cluster.getMetadata().getTokenRanges().stream().map(tokenRange -> {
            return new BigInteger(tokenRange.getEnd().getValue().toString());
        }).collect(Collectors.toList()));
        LOG.info("{} splits were actually generated", Integer.valueOf(generateSplits.size()));
        String str = (String) cluster.getMetadata().getKeyspace(read.keyspace()).getTable(read.table()).getPartitionKey().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.joining(","));
        ArrayList arrayList = new ArrayList();
        for (List<RingRange> list : generateSplits) {
            ArrayList arrayList2 = new ArrayList();
            for (RingRange ringRange : list) {
                Select.Where where = QueryBuilder.select().from(read.keyspace(), read.table()).where();
                if (ringRange.isWrapping()) {
                    String where2 = where.and(QueryBuilder.gte("token(" + str + ")", ringRange.getStart())).toString();
                    LOG.debug("Cassandra generated read query : {}", where2);
                    arrayList2.add(where2);
                    String where3 = QueryBuilder.select().from(read.keyspace(), read.table()).where().and(QueryBuilder.lt("token(" + str + ")", ringRange.getEnd())).toString();
                    LOG.debug("Cassandra generated read query : {}", where3);
                    arrayList2.add(where3);
                } else {
                    String where4 = where.and(QueryBuilder.gte("token(" + str + ")", ringRange.getStart())).and(QueryBuilder.lt("token(" + str + ")", ringRange.getEnd())).toString();
                    LOG.debug("Cassandra generated read query : {}", where4);
                    arrayList2.add(where4);
                }
            }
            arrayList.add(new CassandraIO.CassandraSource(read, arrayList2));
        }
        return arrayList;
    }

    private static long getNumSplits(long j, long j2, @Nullable Integer num) {
        long j3 = j > 0 ? j2 / j : 1L;
        if (j3 <= 0) {
            LOG.warn("Number of splits is less than 0 ({}), fallback to 1", Long.valueOf(j3));
            j3 = 1;
        }
        return num != null ? Math.max(j3, num.intValue()) : j3;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Cluster getCluster(List<String> list, int i, String str, String str2, String str3, String str4) {
        Cluster.Builder withPort = Cluster.builder().addContactPoints((String[]) list.toArray(new String[0])).withPort(i);
        if (str != null) {
            withPort.withAuthProvider(new PlainTextAuthProvider(str, str2));
        }
        DCAwareRoundRobinPolicy.Builder builder = new DCAwareRoundRobinPolicy.Builder();
        if (str3 != null) {
            builder.withLocalDc(str3);
        }
        withPort.withLoadBalancingPolicy(new TokenAwarePolicy(builder.build()));
        if (str4 != null) {
            withPort.withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(str4)));
        }
        return withPort.build();
    }

    private static List<TokenRange> getTokenRanges(Cluster cluster, String str, String str2) {
        Session newSession = cluster.newSession();
        Throwable th = null;
        try {
            try {
                ResultSet<Row> execute = newSession.execute("SELECT range_start, range_end, partitions_count, mean_partition_size FROM system.size_estimates WHERE keyspace_name = ? AND table_name = ?", new Object[]{str, str2});
                ArrayList arrayList = new ArrayList();
                for (Row row : execute) {
                    arrayList.add(new TokenRange(row.getLong("partitions_count"), row.getLong("mean_partition_size"), new BigInteger(row.getString("range_start")), new BigInteger(row.getString("range_end"))));
                }
                if (newSession != null) {
                    $closeResource(null, newSession);
                }
                return arrayList;
            } finally {
            }
        } catch (Throwable th2) {
            if (newSession != null) {
                $closeResource(th, newSession);
            }
            throw th2;
        }
    }

    @VisibleForTesting
    static double getRingFraction(List<TokenRange> list) {
        double d = 0.0d;
        for (TokenRange tokenRange : list) {
            d += distance(tokenRange.rangeStart, tokenRange.rangeEnd).doubleValue() / SplitGenerator.getRangeSize(MURMUR3PARTITIONER).doubleValue();
        }
        return d;
    }

    @VisibleForTesting
    static BigInteger distance(BigInteger bigInteger, BigInteger bigInteger2) {
        return bigInteger2.compareTo(bigInteger) > 0 ? bigInteger2.subtract(bigInteger) : bigInteger2.subtract(bigInteger).add(SplitGenerator.getRangeSize(MURMUR3PARTITIONER));
    }

    @VisibleForTesting
    static boolean isMurmur3Partitioner(Cluster cluster) {
        return MURMUR3PARTITIONER.equals(cluster.getMetadata().getPartitioner());
    }

    @Override // org.apache.beam.sdk.io.cassandra.CassandraService
    public CassandraService.Writer<T> createWriter(CassandraIO.Mutate<T> mutate) {
        return new WriterImpl(mutate);
    }

    @Override // org.apache.beam.sdk.io.cassandra.CassandraService
    public CassandraService.Deleter<T> createDeleter(CassandraIO.Mutate<T> mutate) {
        return new DeleterImpl(mutate);
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
