package oracle.jdbc.txeventq.kafka.connect.source.task;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.txeventq.kafka.connect.common.utils.AppInfoParser;
import oracle.jdbc.txeventq.kafka.connect.source.utils.TxEventQConnectorConfig;
import oracle.jdbc.txeventq.kafka.connect.source.utils.TxEventQConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oracle/jdbc/txeventq/kafka/connect/source/task/TxEventQSourceTask.class */
public class TxEventQSourceTask extends SourceTask {
    static final Logger log = LoggerFactory.getLogger(TxEventQSourceTask.class);
    private String connectorName;
    private int batchSize;
    private TxEventQConnectorConfig config;
    private TxEventQConsumer consumer = null;
    private OracleConnection conn = null;

    public String version() {
        return AppInfoParser.getVersion();
    }

    public void start(Map<String, String> map) {
        log.info("[{}] Starting Kafka Connect for Oracle TxEventQ - Source Task", Long.valueOf(Thread.currentThread().getId()));
        try {
            this.config = new TxEventQConnectorConfig(map);
            this.consumer = new TxEventQConsumer(this.config);
            this.connectorName = this.config.name();
            this.batchSize = 1;
            try {
                if (this.consumer.isConnOpen(this.conn)) {
                    log.debug("[{}] Connection [{}] is already open.", Long.valueOf(Thread.currentThread().getId()), this.conn);
                } else {
                    this.conn = this.consumer.connect();
                }
                try {
                    int kafkaTopicPartitionSize = this.consumer.getKafkaTopicPartitionSize(this.config.getString(TxEventQConnectorConfig.KAFKA_TOPIC));
                    int numOfShardsForQueue = this.consumer.getNumOfShardsForQueue(this.conn, this.config.getString("txeventq.queue.name"));
                    if (kafkaTopicPartitionSize < numOfShardsForQueue / 2) {
                        throw new ConnectException("The number of Kafka partitions " + kafkaTopicPartitionSize + " must be greater than or equal to " + (numOfShardsForQueue / 2));
                    }
                    log.info("[{}]:[{}] Connector [{}] Source Task started!", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, this.connectorName});
                } catch (SQLException e) {
                    throw new ConnectException("Error attempting to validate the Kafka partition size is valid compared to the TxEventQ event stream: " + e.toString());
                }
            } catch (SQLException e2) {
                log.error("Database connection error occurred attempting to start task.");
                throw new ConnectException(e2);
            }
        } catch (ConfigException e3) {
            log.error("[{}] Couldn't start TxEventQSourceTask due to configuration error", Long.valueOf(Thread.currentThread().getId()));
            throw new ConnectException("Couldn't start TxEventQSourceTask due to configuration error", e3);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        log.info("[{}]:[{}] Entry TxEventQ SourceTask poll.", Long.valueOf(Thread.currentThread().getId()), this.conn);
        ArrayList arrayList = null;
        for (int i = 0; i < this.batchSize; i++) {
            log.info("[{}]:[{}] receiving TxEventQ Messages.", Long.valueOf(Thread.currentThread().getId()), this.conn);
            try {
                if (this.consumer.isConnOpen(this.conn)) {
                    log.debug("[{}] Connection [{}] is already open.", Long.valueOf(Thread.currentThread().getId()), this.conn);
                } else {
                    this.conn = this.consumer.connect();
                }
                SourceRecord receive = this.consumer.receive(this.conn);
                log.info("[{}]:[{}] The source record: {}", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, receive});
                if (receive == null) {
                    return arrayList;
                }
                if (arrayList == null) {
                    arrayList = new ArrayList();
                }
                arrayList.add(receive);
            } catch (SQLException e) {
                log.error("Database connection error occurred attempting to poll records.");
                throw new ConnectException(e);
            }
        }
        log.info("[{}]:[{}] Returning {} records", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, Integer.valueOf(recordCount(arrayList))});
        return arrayList;
    }

    private int recordCount(List<SourceRecord> list) {
        if (list == null) {
            return 0;
        }
        return list.size();
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) throws InterruptedException {
        try {
            if (this.consumer.isConnOpen(this.conn)) {
                log.info("[{}]:[{}] Committing record: {} .", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.conn, sourceRecord});
                this.conn.commit();
            }
        } catch (SQLException e) {
            log.error("Error occurred attempting to commit record.");
            throw new ConnectException(e);
        }
    }

    public void stop() {
        log.info("[{}]:[{}] Stopping Kafka Connect for Oracle TxEventQ - Source Task", Long.valueOf(Thread.currentThread().getId()), this.conn);
        try {
            if (this.consumer.isConnOpen(this.conn)) {
                closeDatabaseConnection();
            }
        } catch (SQLException e) {
            log.error("Error occurred attempting to stop SourceTask.");
            throw new ConnectException(e);
        }
    }

    private void closeDatabaseConnection() {
        try {
            this.consumer.close();
        } catch (IOException e) {
            log.error("Exception occurred while closing database connection.");
        }
    }
}
