package oracle.jdbc.txeventq.kafka.connect.sink.task;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.txeventq.kafka.connect.common.utils.AppInfoParser;
import oracle.jdbc.txeventq.kafka.connect.sink.utils.TxEventQProducer;
import oracle.jdbc.txeventq.kafka.connect.sink.utils.TxEventQSinkConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oracle/jdbc/txeventq/kafka/connect/sink/task/TxEventQSinkTask.class */
public class TxEventQSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(TxEventQSinkTask.class);
    private TxEventQSinkConfig config;
    private TxEventQProducer producer;
    private OracleConnection conn;

    public String version() {
        return AppInfoParser.getVersion();
    }

    public void start(Map<String, String> map) {
        log.info("[{}] Starting Kafka Connect for Oracle TxEventQ - Sink Task", Long.valueOf(Thread.currentThread().getId()));
        try {
            this.config = new TxEventQSinkConfig(map);
            try {
                this.producer = new TxEventQProducer(this.config);
                if (this.producer.isConnOpen(this.conn)) {
                    log.debug("[{}]:[{}] Connection is already open.", Long.valueOf(Thread.currentThread().getId()), this.conn);
                } else {
                    this.conn = this.producer.connect();
                }
                if (!this.producer.kafkaTopicExists(this.config.getString(TxEventQSinkConfig.KAFKA_TOPIC))) {
                    throw new ConnectException("The Kafka topic " + this.config.getString(TxEventQSinkConfig.KAFKA_TOPIC) + " does not exist.");
                }
                try {
                    if (!this.producer.txEventQueueExists(this.conn, this.config.getString("txeventq.queue.name").toUpperCase())) {
                        throw new ConnectException("The TxEventQ queue name " + this.config.getString("txeventq.queue.name") + " does not exist.");
                    }
                    try {
                        int kafkaTopicPartitionSize = this.producer.getKafkaTopicPartitionSize(this.config.getString(TxEventQSinkConfig.KAFKA_TOPIC));
                        int numOfShardsForQueue = this.producer.getNumOfShardsForQueue(this.conn, this.config.getString("txeventq.queue.name"));
                        if (kafkaTopicPartitionSize > numOfShardsForQueue) {
                            throw new ConnectException("The number of Kafka partitions " + kafkaTopicPartitionSize + " must be less than or equal the number TxEventQ event stream " + numOfShardsForQueue);
                        }
                    } catch (SQLException e) {
                        throw new ConnectException("Error attempting to validate the Kafka partition size is valid compared to the TxEventQ event stream: " + e.toString());
                    }
                } catch (SQLException e2) {
                    throw new ConnectException("Error attempting to validate the existence of the TxEventQ queue name: " + e2.toString());
                }
            } catch (SQLException e3) {
                log.error("Database connection error occurred attempting to start task.");
                throw new ConnectException(e3);
            }
        } catch (ConfigException e4) {
            log.error("[{}] Couldn't start TxEventQSinkTask due to configuration error", Long.valueOf(Thread.currentThread().getId()));
            throw new ConnectException("Couldn't start TxEventQSinkTask due to configuration error", e4);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        try {
            if (this.producer.isConnOpen(this.conn)) {
                log.debug("[{}] Connection [{}] is already open.", Long.valueOf(Thread.currentThread().getId()), this.conn);
            } else {
                this.conn = this.producer.connect();
            }
            this.producer.put(collection);
        } catch (SQLException e) {
            log.error("Database connection error occurred attempting to start task.");
            throw new ConnectException(e);
        }
    }

    public void stop() {
        log.info("[{}]:[{}] Stopping Kafka Connect for Oracle TxEventQ - Sink Task", Long.valueOf(Thread.currentThread().getId()), this.conn);
        try {
            if (this.producer.isConnOpen(this.conn)) {
                closeDatabaseConnection();
            }
        } catch (SQLException e) {
            log.error("Error occurred attempting to stop Sink Task.");
            throw new ConnectException(e);
        }
    }

    private void closeDatabaseConnection() {
        try {
            this.producer.close();
        } catch (IOException e) {
            log.error("Exception occurred while closing database connection.");
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        map.clear();
        return map;
    }

    public void open(Collection<TopicPartition> collection) {
        if (!this.producer.createOffsetInfoTable(this.producer.getConnection())) {
            throw new ConnectException("TXEVENTQ_TRACK_OFFSETS table couldn't be created or accessed to setup offset information.");
        }
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : collection) {
            hashMap.put(topicPartition, Long.valueOf(this.producer.getOffsetInDatabase(this.producer.getConnection(), topicPartition.topic(), this.config.getString("txeventq.queue.name"), this.config.getString("txeventq.queue.schema"), topicPartition.partition())));
        }
        this.context.offset(hashMap);
    }
}
