package org.apache.flink.connector.cassandra.source.enumerator;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.class */
public final class CassandraSplitEnumerator implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class);
    private final SplitEnumeratorContext<CassandraSplit> enumeratorContext;
    private CassandraEnumeratorState state;
    private final Cluster cluster;
    private final Long maxSplitMemorySize;
    private final Session session;
    private final String keyspace;
    private final String table;

    public CassandraSplitEnumerator(SplitEnumeratorContext<CassandraSplit> splitEnumeratorContext, CassandraEnumeratorState cassandraEnumeratorState, ClusterBuilder clusterBuilder, Long l, String str, String str2) {
        this.enumeratorContext = splitEnumeratorContext;
        this.state = cassandraEnumeratorState == null ? new CassandraEnumeratorState() : cassandraEnumeratorState;
        this.cluster = clusterBuilder.getCluster();
        this.maxSplitMemorySize = l;
        this.session = this.cluster.newSession();
        this.keyspace = str;
        this.table = str2;
    }

    public void start() {
        this.enumeratorContext.callAsync(this::prepareSplits, (cassandraEnumeratorState, th) -> {
            LOG.debug("Initialized CassandraEnumeratorState: {}", cassandraEnumeratorState.toString());
            this.state = cassandraEnumeratorState;
        });
    }

    private CassandraEnumeratorState prepareSplits() {
        return new SplitsGenerator(this.cluster.getMetadata().getPartitioner().contains(SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER.getClassName()) ? SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER : SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER, this.session, this.keyspace, this.table, this.enumeratorContext.currentParallelism(), this.maxSplitMemorySize.longValue()).prepareSplits();
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        checkReaderRegistered(i);
        CassandraSplit nextSplit = this.state.getNextSplit();
        if (nextSplit != null) {
            LOG.info("Assigning splits to reader {}", Integer.valueOf(i));
            this.enumeratorContext.assignSplit(nextSplit, i);
        } else {
            LOG.info("No split assigned to reader {} because the enumerator has no unassigned split left. Sending NoMoreSplitsEvent to reader", Integer.valueOf(i));
            this.enumeratorContext.signalNoMoreSplits(i);
        }
    }

    public void addSplitsBack(List<CassandraSplit> list, int i) {
        this.state.addSplitsBack(list);
    }

    public void addReader(int i) {
    }

    private void checkReaderRegistered(int i) {
        if (!this.enumeratorContext.registeredReaders().containsKey(Integer.valueOf(i))) {
            throw new IllegalStateException(String.format("Reader %d is not registered to source coordinator", Integer.valueOf(i)));
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public CassandraEnumeratorState m942snapshotState(long j) {
        return this.state;
    }

    public void close() throws IOException {
        try {
            if (this.session != null) {
                this.session.close();
            }
        } catch (Exception e) {
            LOG.error("Error while closing session.", e);
        }
        try {
            if (this.cluster != null) {
                this.cluster.close();
            }
        } catch (Exception e2) {
            LOG.error("Error while closing cluster.", e2);
        }
    }
}
