package org.apache.flink.connector.cassandra.source.reader;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.Mapper;
import java.util.Map;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.class */
public class CassandraSourceReader<OUT> extends SingleThreadMultiplexSourceReaderBase<CassandraRow, OUT, CassandraSplit, CassandraSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraSourceReader.class);
    private final Cluster cluster;
    private final Session session;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CassandraSourceReader(SourceReaderContext sourceReaderContext, String str, String str2, String str3, Cluster cluster, Session session, Mapper<OUT> mapper) {
        super(() -> {
            return new CassandraSplitReader(cluster, session, str, str2, str3);
        }, new CassandraRecordEmitter(resultSet -> {
            return mapper.map(resultSet).one();
        }), sourceReaderContext.getConfiguration(), sourceReaderContext);
        this.cluster = cluster;
        this.session = session;
    }

    public void start() {
        this.context.sendSplitRequest();
    }

    protected void onSplitFinished(Map<String, CassandraSplit> map) {
        this.context.sendSplitRequest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CassandraSplit initializedState(CassandraSplit cassandraSplit) {
        return cassandraSplit;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CassandraSplit toSplitType(String str, CassandraSplit cassandraSplit) {
        return cassandraSplit;
    }

    public void close() throws Exception {
        super.close();
        try {
            if (this.session != null) {
                this.session.close();
            }
        } catch (Exception e) {
            LOG.error("Error while closing session.", e);
        }
        try {
            if (this.cluster != null) {
                this.cluster.close();
            }
        } catch (Exception e2) {
            LOG.error("Error while closing cluster.", e2);
        }
    }
}
