package org.apache.flink.connector.cassandra.source;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
import org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
import org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory;
import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
import org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
import org.apache.flink.util.Preconditions;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/cassandra/source/CassandraSource.class */
public class CassandraSource<OUT> implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1;
    private final ClusterBuilder clusterBuilder;
    private final Class<OUT> pojoClass;
    private final String query;
    private final String keyspace;
    private final String table;
    private final MapperOptions mapperOptions;
    private final long maxSplitMemorySize;
    public static final Pattern CQL_PROHIBITED_CLAUSES_REGEXP = Pattern.compile("(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER BY|GROUP BY).*");
    public static final Pattern SELECT_REGEXP = Pattern.compile("(?i)select .+ from (\\w+)\\.(\\w+).*;$");
    private static final long MIN_SPLIT_MEMORY_SIZE = MemorySize.ofMebiBytes(10).getBytes();
    static final long MAX_SPLIT_MEMORY_SIZE_DEFAULT = MemorySize.ofMebiBytes(64).getBytes();

    public CassandraSource(ClusterBuilder clusterBuilder, Class<OUT> cls, String str, MapperOptions mapperOptions) {
        this(clusterBuilder, MAX_SPLIT_MEMORY_SIZE_DEFAULT, cls, str, mapperOptions);
    }

    public CassandraSource(ClusterBuilder clusterBuilder, long j, Class<OUT> cls, String str, MapperOptions mapperOptions) {
        Preconditions.checkNotNull(clusterBuilder, "ClusterBuilder required but not provided");
        Preconditions.checkNotNull(cls, "POJO class required but not provided");
        Preconditions.checkNotNull(str, "query required but not provided");
        Preconditions.checkState(j >= MIN_SPLIT_MEMORY_SIZE, "Defined maxSplitMemorySize (%s) is below minimum (%s)", new Object[]{Long.valueOf(j), Long.valueOf(MIN_SPLIT_MEMORY_SIZE)});
        this.maxSplitMemorySize = j;
        Matcher checkQueryValidity = checkQueryValidity(str);
        this.query = str;
        this.keyspace = checkQueryValidity.group(1);
        this.table = checkQueryValidity.group(2);
        this.clusterBuilder = clusterBuilder;
        ClosureCleaner.clean(clusterBuilder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
        this.pojoClass = cls;
        this.mapperOptions = mapperOptions;
    }

    @VisibleForTesting
    public static Matcher checkQueryValidity(String str) {
        Preconditions.checkState(!str.matches(CQL_PROHIBITED_CLAUSES_REGEXP.pattern()), "Aggregations/OrderBy are not supported because the query is executed on subsets/partitions of the input table");
        Matcher matcher = SELECT_REGEXP.matcher(str);
        Preconditions.checkState(matcher.matches(), "Query must be of the form select ... from keyspace.table ...;");
        return matcher;
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    @Internal
    public SourceReader<OUT, CassandraSplit> createReader(SourceReaderContext sourceReaderContext) {
        return new CassandraSourceReaderFactory().create(sourceReaderContext, this.clusterBuilder, this.pojoClass, this.query, this.keyspace, this.table, this.mapperOptions);
    }

    @Internal
    public SplitEnumerator<CassandraSplit, CassandraEnumeratorState> createEnumerator(SplitEnumeratorContext<CassandraSplit> splitEnumeratorContext) {
        return new CassandraSplitEnumerator(splitEnumeratorContext, null, this.clusterBuilder, Long.valueOf(this.maxSplitMemorySize), this.keyspace, this.table);
    }

    @Internal
    public SplitEnumerator<CassandraSplit, CassandraEnumeratorState> restoreEnumerator(SplitEnumeratorContext<CassandraSplit> splitEnumeratorContext, CassandraEnumeratorState cassandraEnumeratorState) {
        return new CassandraSplitEnumerator(splitEnumeratorContext, cassandraEnumeratorState, this.clusterBuilder, Long.valueOf(this.maxSplitMemorySize), this.keyspace, this.table);
    }

    @Internal
    public SimpleVersionedSerializer<CassandraSplit> getSplitSerializer() {
        return CassandraSplitSerializer.INSTANCE;
    }

    @Internal
    public SimpleVersionedSerializer<CassandraEnumeratorState> getEnumeratorCheckpointSerializer() {
        return CassandraEnumeratorStateSerializer.INSTANCE;
    }

    public TypeInformation<OUT> getProducedType() {
        return TypeInformation.of(this.pojoClass);
    }

    @Internal
    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<CassandraSplit>) splitEnumeratorContext, (CassandraEnumeratorState) obj);
    }
}
