package org.apache.beam.sdk.io.astra.db;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.servererrors.SyntaxError;
import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.astra.db.AstraDbIO;
import org.apache.beam.sdk.io.astra.db.mapping.AstraDbMapper;
import org.apache.beam.sdk.io.astra.db.transforms.split.RingRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/sdk/io/astra/db/ReadFn.class */
public class ReadFn<T> extends DoFn<AstraDbIO.Read<T>, T> {
    private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class);

    public ReadFn() {
        LOG.info("Starter Reader");
    }

    @DoFn.ProcessElement
    public void processElement(@DoFn.Element AstraDbIO.Read<T> read, DoFn.OutputReceiver<T> outputReceiver) {
        try {
            if (read.ringRanges() != null) {
                LOG.debug("Read Table '{}.{}' ({} range)", new Object[]{read.keyspace().get(), read.table().get(), Integer.valueOf(((List) read.ringRanges().get()).size())});
            } else {
                LOG.debug("Read Table '{}.{}'", read.keyspace().get(), read.table().get());
            }
            CqlSession cqlSession = CqlSessionHolder.getCqlSession((AstraDbIO.Read<?>) read);
            AstraDbMapper astraDbMapper = (AstraDbMapper) read.mapperFactoryFn().apply(cqlSession);
            String str = (String) ((TableMetadata) ((KeyspaceMetadata) cqlSession.getMetadata().getKeyspace((String) read.keyspace().get()).get()).getTable((String) read.table().get()).get()).getPartitionKey().stream().map((v0) -> {
                return v0.getName();
            }).map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(","));
            PreparedStatement prepare = cqlSession.prepare(generateRangeQuery(read, str, Boolean.valueOf(read.ringRanges() != null)));
            if (read.ringRanges() != null && !((List) read.ringRanges().get()).isEmpty()) {
                for (RingRange ringRange : (List) read.ringRanges().get()) {
                    Murmur3Token murmur3Token = new Murmur3Token(ringRange.getStart().longValue());
                    Murmur3Token murmur3Token2 = new Murmur3Token(ringRange.getEnd().longValue());
                    if (ringRange.isWrapping()) {
                        ResultSet execute = cqlSession.execute(getLowestSplitQuery(read, str, ringRange.getEnd()));
                        LOG.debug("Anything below [{}] get you {} items", ringRange.getEnd(), Integer.valueOf(execute.getAvailableWithoutFetching()));
                        outputResults(execute, outputReceiver, astraDbMapper);
                        ResultSet execute2 = cqlSession.execute(getHighestSplitQuery(read, str, ringRange.getStart()));
                        LOG.debug("Anything above [{}] get you {} items", ringRange.getStart(), Integer.valueOf(execute2.getAvailableWithoutFetching()));
                        outputResults(execute2, outputReceiver, astraDbMapper);
                    } else {
                        ResultSet execute3 = cqlSession.execute(prepare.bind(new Object[0]).setToken(0, murmur3Token).setToken(1, murmur3Token2));
                        LOG.debug("Range[{}-{}] gets you {} items", new Object[]{Long.valueOf(ringRange.getStart().longValue()), Long.valueOf(ringRange.getEnd().longValue()), Integer.valueOf(execute3.getAvailableWithoutFetching())});
                        outputResults(execute3, outputReceiver, astraDbMapper);
                    }
                }
            } else if (read.query() != null) {
                LOG.info("Executing User Query: {}", read.query().get());
                outputResults(cqlSession.execute((String) read.query().get()), outputReceiver, astraDbMapper);
            } else {
                LOG.info("Executing Table Full Scan Query");
                outputResults(cqlSession.execute(String.format("SELECT * FROM %s.%s", read.keyspace().get(), read.table().get())), outputReceiver, astraDbMapper);
            }
        } catch (SyntaxError e) {
            LOG.debug("SyntaxError : {}", e.getMessage());
        } catch (Exception e2) {
            LOG.error("Cannot process read operation against Cassandra", e2);
            throw new IllegalStateException("Cannot process read operation against Cassandra", e2);
        }
    }

    private static <T> void outputResults(ResultSet resultSet, DoFn.OutputReceiver<T> outputReceiver, AstraDbMapper<T> astraDbMapper) {
        Iterator<T> map = astraDbMapper.map(resultSet);
        while (map.hasNext()) {
            outputReceiver.output(map.next());
        }
    }

    private static String getHighestSplitQuery(AstraDbIO.Read<?> read, String str, BigInteger bigInteger) {
        String format = String.format("(token(%s) >= %d)", str, bigInteger.subtract(new BigInteger("1")), str);
        return read.query() == null ? buildInitialQuery(read, true) + format : read.query() + " AND " + format;
    }

    private static String getLowestSplitQuery(AstraDbIO.Read<?> read, String str, BigInteger bigInteger) {
        String format = String.format(" (token(%s) < %d) ", str, bigInteger.add(new BigInteger("1")));
        return read.query() == null ? buildInitialQuery(read, true) + format : read.query() + " AND " + format;
    }

    private static String generateRangeQuery(AstraDbIO.Read<?> read, String str, Boolean bool) {
        return buildInitialQuery(read, bool) + (bool.booleanValue() ? Joiner.on(" AND ").skipNulls().join(String.format("(token(%s) >= ?)", str), String.format("(token(%s) < ?)", str), new Object[0]) : "");
    }

    private static String buildInitialQuery(AstraDbIO.Read<?> read, Boolean bool) {
        String str;
        if (read.query() == null) {
            str = String.format("SELECT * FROM %s.%s", read.keyspace().get(), read.table().get()) + " WHERE ";
        } else {
            str = ((String) read.query().get()) + (bool.booleanValue() ? ((String) read.query().get()).toUpperCase().contains("WHERE") ? " AND " : " WHERE " : "");
        }
        return str;
    }
}
