package org.apache.flink.connector.cassandra.source.reader;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Token;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.cassandra.source.CassandraSource;
import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.class */
public class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class);
    private final Cluster cluster;
    private final Session session;
    private final String query;
    private final String keyspace;
    private final String table;
    private final AtomicBoolean wakeup = new AtomicBoolean(false);
    private final Set<CassandraSplit> unprocessedSplits = new HashSet();

    public CassandraSplitReader(Cluster cluster, Session session, String str, String str2, String str3) {
        this.query = str;
        this.keyspace = str2;
        this.table = str3;
        this.cluster = cluster;
        this.session = session;
    }

    public RecordsWithSplitIds<CassandraRow> fetch() {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        Metadata metadata = this.cluster.getMetadata();
        PreparedStatement prepare = this.session.prepare(generateRangeQuery(this.query, getPartitionKey(metadata)));
        this.wakeup.compareAndSet(true, false);
        for (CassandraSplit cassandraSplit : this.unprocessedSplits) {
            if (this.wakeup.get()) {
                break;
            }
            try {
                Token newToken = metadata.newToken(cassandraSplit.getRingRangeStart().toString());
                addRecordsToOutput(this.session.execute(prepare.bind().setToken(0, newToken).setToken(1, metadata.newToken(cassandraSplit.getRingRangeEnd().toString()))), cassandraSplit, hashMap);
                hashSet.add(cassandraSplit.splitId());
                this.unprocessedSplits.remove(cassandraSplit);
            } catch (Exception e) {
                LOG.error("Error while reading split ", e);
            }
        }
        return new RecordsBySplits(hashMap, hashSet);
    }

    private String getPartitionKey(Metadata metadata) {
        return (String) metadata.getKeyspace(this.keyspace).getTable(this.table).getPartitionKey().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.joining(","));
    }

    public void wakeUp() {
        this.wakeup.compareAndSet(false, true);
    }

    public void handleSplitsChanges(SplitsChange<CassandraSplit> splitsChange) {
        this.unprocessedSplits.addAll(splitsChange.splits());
    }

    @VisibleForTesting
    static String generateRangeQuery(String str, String str2) {
        int end;
        String format;
        Matcher matcher = CassandraSource.SELECT_REGEXP.matcher(str);
        if (!matcher.matches()) {
            throw new IllegalStateException(String.format("Failed to generate range query out of the provided query: %s", str));
        }
        int indexOf = str.toLowerCase().indexOf("where");
        if (indexOf != -1) {
            end = indexOf + "where".length();
            format = String.format(" (token(%s) >= ?) AND (token(%s) < ?) AND", str2, str2);
        } else {
            end = matcher.end(2);
            format = String.format(" WHERE (token(%s) >= ?) AND (token(%s) < ?)", str2, str2);
        }
        return String.format("%s%s%s", str.substring(0, end), format, str.substring(end));
    }

    private void addRecordsToOutput(ResultSet resultSet, CassandraSplit cassandraSplit, Map<String, Collection<CassandraRow>> map) {
        resultSet.forEach(row -> {
            ((Collection) map.computeIfAbsent(cassandraSplit.splitId(), str -> {
                return new ArrayList();
            })).add(new CassandraRow(row, resultSet.getExecutionInfo()));
        });
    }

    public void close() throws Exception {
    }
}
