package net.nmoncho.helenus.flink.source;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.GettableByIndex;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.RelationMetadata;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import net.nmoncho.helenus.api.RowMapper;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.util.control.Breaks$;
import scala.util.matching.Regex;

/* compiled from: CassandraSourceReader.scala */
/* loaded from: input_file:net/nmoncho/helenus/flink/source/CassandraSourceReader$.class */
public final class CassandraSourceReader$ {
    public static CassandraSourceReader$ MODULE$;
    private final Logger net$nmoncho$helenus$flink$source$CassandraSourceReader$$log;

    static {
        new CassandraSourceReader$();
    }

    public Logger net$nmoncho$helenus$flink$source$CassandraSourceReader$$log() {
        return this.net$nmoncho$helenus$flink$source$CassandraSourceReader$$log;
    }

    public <Out> RecordEmitter<Tuple2<Row, ExecutionInfo>, Out, CassandraSplit> recordEmitter(final RowMapper<Out> rowMapper) {
        return new RecordEmitter<Tuple2<Row, ExecutionInfo>, Out, CassandraSplit>(rowMapper) { // from class: net.nmoncho.helenus.flink.source.CassandraSourceReader$$anon$1
            private final RowMapper mapper$1;

            public void emitRecord(Tuple2<Row, ExecutionInfo> tuple2, SourceOutput<Out> sourceOutput, CassandraSplit cassandraSplit) {
                sourceOutput.collect(this.mapper$1.apply((Row) tuple2._1()));
            }

            {
                this.mapper$1 = rowMapper;
            }
        };
    }

    public <Out> SplitReader<Tuple2<Row, ExecutionInfo>, CassandraSplit> splitReader(final CqlSession cqlSession, final Function1<CqlSession, Object> function1) {
        return new SplitReader<Tuple2<Row, ExecutionInfo>, CassandraSplit>(function1, cqlSession) { // from class: net.nmoncho.helenus.flink.source.CassandraSourceReader$$anon$2
            private final HashSet<CassandraSplit> unprocessedSplits = new HashSet<>();
            private final AtomicBoolean wakeup = new AtomicBoolean(false);
            private final Function1 bstmt$2;
            private final CqlSession session$2;

            public void pauseOrResumeSplits(Collection<CassandraSplit> collection, Collection<CassandraSplit> collection2) {
                super.pauseOrResumeSplits(collection, collection2);
            }

            private HashSet<CassandraSplit> unprocessedSplits() {
                return this.unprocessedSplits;
            }

            private AtomicBoolean wakeup() {
                return this.wakeup;
            }

            public RecordsWithSplitIds<Tuple2<Row, ExecutionInfo>> fetch() {
                HashMap hashMap = new HashMap();
                HashSet hashSet = new HashSet();
                Object apply = this.bstmt$2.apply(this.session$2);
                Tuple2<String, String> extractKeyspaceTable = package$.MODULE$.extractKeyspaceTable(this.session$2, ((BoundStatement) apply).getPreparedStatement().getQuery());
                if (extractKeyspaceTable == null) {
                    throw new MatchError(extractKeyspaceTable);
                }
                Tuple2 tuple2 = new Tuple2((String) extractKeyspaceTable._1(), (String) extractKeyspaceTable._2());
                String fetchPartitioningKey = fetchPartitioningKey((String) tuple2._1(), (String) tuple2._2());
                wakeup().compareAndSet(true, false);
                Breaks$.MODULE$.breakable(() -> {
                    if (this.wakeup().get()) {
                        throw Breaks$.MODULE$.break();
                    }
                    ((IterableLike) CollectionConverters$.MODULE$.asScalaSetConverter(this.unprocessedSplits()).asScala()).foreach(cassandraSplit -> {
                        try {
                            this.addRecordsToOutput(this.session$2.execute(CassandraSourceReader$.MODULE$.generateFinalQuery(this.session$2, apply, fetchPartitioningKey, CassandraSplit$CassandraPartitioner$.MODULE$.token(this.session$2, cassandraSplit.ringRangeStart()), CassandraSplit$CassandraPartitioner$.MODULE$.token(this.session$2, cassandraSplit.ringRangeEnd()))), cassandraSplit, hashMap);
                            hashSet.add(cassandraSplit.splitId());
                            return BoxesRunTime.boxToBoolean(this.unprocessedSplits().remove(cassandraSplit));
                        } catch (Throwable th) {
                            CassandraSourceReader$.MODULE$.net$nmoncho$helenus$flink$source$CassandraSourceReader$$log().error("Error while reading split ", th);
                            return BoxedUnit.UNIT;
                        }
                    });
                });
                return new RecordsBySplits(hashMap, hashSet);
            }

            public String fetchPartitioningKey(String str, String str2) {
                return ((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(((RelationMetadata) ((KeyspaceMetadata) this.session$2.getMetadata().getKeyspace(str).get()).getTable(str2).get()).getPartitionKey()).asScala()).map(columnMetadata -> {
                    return columnMetadata.getName();
                }, Buffer$.MODULE$.canBuildFrom())).mkString(",");
            }

            public void addRecordsToOutput(ResultSet resultSet, CassandraSplit cassandraSplit, Map<String, Collection<Tuple2<Row, ExecutionInfo>>> map) {
                resultSet.forEach(row -> {
                    ((Collection) map.computeIfAbsent(cassandraSplit.splitId(), str -> {
                        return new ArrayList();
                    })).add(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(row), resultSet.getExecutionInfo()));
                });
            }

            public void wakeUp() {
                wakeup().compareAndSet(false, true);
            }

            public void handleSplitsChanges(SplitsChange<CassandraSplit> splitsChange) {
                unprocessedSplits().addAll(splitsChange.splits());
            }

            public void close() {
            }

            {
                this.bstmt$2 = function1;
                this.session$2 = cqlSession;
            }
        };
    }

    public BoundStatement generateFinalQuery(CqlSession cqlSession, Object obj, String str, Token token, Token token2) {
        int size = ((BoundStatement) obj).getPreparedStatement().getVariableDefinitions().size();
        return (BoundStatement) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), size).foldLeft(cqlSession.prepare(generateRangeQuery(((BoundStatement) obj).getPreparedStatement().getQuery(), str)).bind(new Object[0]).setToken(0, token).setToken(1, token2), (boundStatement, obj2) -> {
            return $anonfun$generateFinalQuery$1(obj, boundStatement, BoxesRunTime.unboxToInt(obj2));
        });
    }

    public String generateRangeQuery(String str, String str2) {
        Regex.Match match = (Regex.Match) package$.MODULE$.SelectRegex().findFirstMatchIn(str).getOrElse(() -> {
            throw new IllegalStateException(new StringBuilder(58).append("Failed to generate range query out of the provided query: ").append(str).toString());
        });
        int indexOf = str.toLowerCase().indexOf(" where ");
        Tuple2 $minus$greater$extension = indexOf != -1 ? Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(indexOf + " where ".length())), new StringBuilder(37).append("(token(").append(str2).append(") >= ?) AND (token(").append(str2).append(") < ?) AND ").toString()) : Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(match.end(3))), new StringBuilder(39).append(" WHERE (token(").append(str2).append(") >= ?) AND (token(").append(str2).append(") < ?)").toString());
        if ($minus$greater$extension == null) {
            throw new MatchError($minus$greater$extension);
        }
        Tuple2 tuple2 = new Tuple2(BoxesRunTime.boxToInteger($minus$greater$extension._1$mcI$sp()), (String) $minus$greater$extension._2());
        int _1$mcI$sp = tuple2._1$mcI$sp();
        return new StringBuilder(0).append(str.substring(0, _1$mcI$sp)).append((String) tuple2._2()).append(str.substring(_1$mcI$sp)).toString();
    }

    public static final /* synthetic */ BoundStatement $anonfun$generateFinalQuery$1(Object obj, BoundStatement boundStatement, int i) {
        return boundStatement.setBytesUnsafe(i + 2, ((GettableByIndex) obj).getBytesUnsafe(i));
    }

    private CassandraSourceReader$() {
        MODULE$ = this;
        this.net$nmoncho$helenus$flink$source$CassandraSourceReader$$log = LoggerFactory.getLogger(CassandraSourceReader.class);
    }
}
