package oracle.jdbc.txeventq.kafka.connect.source.task;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import oracle.jdbc.txeventq.kafka.connect.common.utils.AppInfoParser;
import oracle.jdbc.txeventq.kafka.connect.source.utils.TxEventQConnectorConfig;
import oracle.jdbc.txeventq.kafka.connect.source.utils.TxEventQConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oracle/jdbc/txeventq/kafka/connect/source/task/TxEventQSourceTask.class */
public class TxEventQSourceTask extends SourceTask {
    static final Logger log = LoggerFactory.getLogger(TxEventQSourceTask.class);
    private String connectorName;
    private int batchSize;
    private int tasksMax;
    private TxEventQConnectorConfig config;
    private CountDownLatch batchCompleteIndicator = null;
    private AtomicInteger pollRotation = new AtomicInteger(1);
    private int lastCommitPollRotation = 0;
    private AtomicBoolean stopNow = new AtomicBoolean();
    private TxEventQConsumer consumer = null;

    public String version() {
        return AppInfoParser.getVersion();
    }

    public void start(Map<String, String> map) {
        log.trace("[{}] Entry {}.start, props={}", new Object[]{Long.valueOf(Thread.currentThread().getId()), getClass().getName(), map});
        this.config = new TxEventQConnectorConfig(map);
        this.consumer = new TxEventQConsumer(this.config);
        this.connectorName = this.config.name();
        this.batchSize = this.config.getInt(TxEventQConnectorConfig.TASK_BATCH_SIZE_CONFIG).intValue();
        log.debug("The batch size is: {}", Integer.valueOf(this.batchSize));
        this.tasksMax = this.config.getInt(TxEventQConnectorConfig.TASK_MAX_CONFIG).intValue();
        log.debug("The tasks.max is: {}", Integer.valueOf(this.tasksMax));
        this.consumer.connect();
        int kafkaTopicPartitionSize = this.consumer.getKafkaTopicPartitionSize(this.config.getString(TxEventQConnectorConfig.KAFKA_TOPIC));
        int numOfShardsForQueue = this.consumer.getNumOfShardsForQueue(this.config.getString("txeventq.queue.name"));
        if (kafkaTopicPartitionSize < numOfShardsForQueue) {
            throw new ConnectException("The number of Kafka partitions " + kafkaTopicPartitionSize + " must be greater than or equal to " + numOfShardsForQueue);
        }
        log.trace("[{}]:[{}] Exit {}.start", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), getClass().getName()});
    }

    public List<SourceRecord> poll() throws InterruptedException {
        log.trace("[{}]:[{}] Entry {}.poll", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), getClass().getName()});
        List<SourceRecord> arrayList = new ArrayList();
        int i = 0;
        if (this.batchCompleteIndicator != null) {
            log.debug("[{}][{}] Awaiting batch completion signal", Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection());
            this.batchCompleteIndicator.await();
            log.debug("[{}]:[{}] Committing records", Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection());
            this.consumer.commit();
        }
        log.debug("[{}]:[{}] Starting poll rotation {}", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), Integer.valueOf(this.pollRotation.incrementAndGet())});
        try {
            if (this.stopNow.get()) {
                log.debug("[{}]:[{}] Stopping polling for records", Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection());
            } else {
                log.debug("[{}]:[{}]:[{}] Polling for TxEventQ messages.", new Object[]{Long.valueOf(Thread.currentThread().getId()), this, this.consumer.getDatabaseConnection()});
                arrayList = this.consumer.receive(this.batchSize);
                if (arrayList != null && !arrayList.isEmpty()) {
                    i = 0 + arrayList.size();
                }
            }
        } catch (ConnectException e) {
            log.error("{}:", e.getClass().getName(), e);
            i = 0;
            arrayList.clear();
        }
        synchronized (this) {
            if (i <= 0) {
                this.batchCompleteIndicator = null;
            } else if (this.stopNow.get()) {
                log.debug("Task is stopping, a batch of {} records is being removed.", Integer.valueOf(i));
                arrayList.clear();
                this.batchCompleteIndicator = null;
            } else {
                this.batchCompleteIndicator = new CountDownLatch(i);
            }
        }
        log.trace("[{}]:[{}]  Exit {}.poll retvalSize={} messageCount={}", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), getClass().getName(), Integer.valueOf(recordCount(arrayList)), Integer.valueOf(i)});
        return arrayList;
    }

    private int recordCount(List<SourceRecord> list) {
        if (list == null) {
            return 0;
        }
        return list.size();
    }

    public void commit() throws InterruptedException {
        log.trace("[{}]:[{}] Entry {}.commit.", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), getClass().getName()});
        int i = this.pollRotation.get();
        log.debug("Commit starting in poll rotation {}", Integer.valueOf(i));
        if (this.lastCommitPollRotation == i) {
            synchronized (this) {
                if (this.batchCompleteIndicator != null) {
                    log.debug("Increase batch complete indicator by {}", Long.valueOf(this.batchCompleteIndicator.getCount()));
                    while (this.batchCompleteIndicator.getCount() > 0) {
                        this.batchCompleteIndicator.countDown();
                    }
                }
            }
        } else {
            this.lastCommitPollRotation = i;
        }
        log.trace("[{}]:[{}]  Exit {}.commit", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), getClass().getName()});
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) throws InterruptedException {
        log.trace("[{}]:[{}] Entry {}.commitRecord, record={}", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), getClass().getName(), sourceRecord});
        this.batchCompleteIndicator.countDown();
        log.trace("[{}]:[{}]  Exit {}.commitRecord", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), getClass().getName()});
    }

    public void stop() {
        log.trace("[{}]:[{}] Entry {}.stop", new Object[]{Long.valueOf(Thread.currentThread().getId()), this.consumer.getDatabaseConnection(), getClass().getName()});
        this.stopNow.set(true);
        synchronized (this) {
            if (this.consumer != null) {
                try {
                    this.consumer.close();
                } catch (IOException e) {
                    throw new ConnectException(e.getMessage());
                }
            }
        }
        log.trace("[{}] Exit {}.stop", Long.valueOf(Thread.currentThread().getId()), getClass().getName());
    }
}
