package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.config.SourceTaskConfig;
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.fs.TaskFileURIProvider;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask.class */
public class FilePulseSourceTask extends SourceTask {
    private static final Logger LOG = LoggerFactory.getLogger(FilePulseSourceTask.class);
    private static final int CONSECUTIVE_WAITS_BEFORE_RETURN = 3;
    private static final String CONNECT_NAME_CONFIG = "name";
    public SourceTaskConfig taskConfig;
    private String defaultTopic;
    private DefaultFileRecordsPollingConsumer consumer;
    private SourceOffsetPolicy offsetPolicy;
    private FileObjectStateReporter reporter;
    private volatile FileObjectContext contextToBeCommitted;
    private StateBackingStoreAccess sharedStore;
    private TaskFileURIProvider fileURIProvider;
    private String connectorGroupName;
    private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
    private final AtomicBoolean isIdle = new AtomicBoolean(false);
    private final ConcurrentLinkedQueue<FileObjectContext> completedToCommit = new ConcurrentLinkedQueue<>();
    private final Map<String, Schema> valueSchemas = new HashMap();
    private final AtomicLong taskThreadId = new AtomicLong(0);
    private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();

    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask$MaxConsecutiveAttempts.class */
    static final class MaxConsecutiveAttempts {
        final AtomicInteger consecutiveAttempts;

        MaxConsecutiveAttempts(int i) {
            if (i <= 0) {
                throw new IllegalArgumentException("'maxConsecutiveAttempts' must be superior to 0");
            }
            this.consecutiveAttempts = new AtomicInteger(i);
        }

        public boolean checkAndDecrement() {
            if (getRemaining() < 0) {
                throw new IllegalStateException("cannot make a new consecutive attempt (remaining=0)");
            }
            return this.consecutiveAttempts.getAndDecrement() > 0;
        }

        int getRemaining() {
            return this.consecutiveAttempts.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/FilePulseSourceTask$State.class */
    public enum State {
        RUNNING,
        STOPPED
    }

    public String version() {
        return new FilePulseSourceConnector().version();
    }

    public void start(Map<String, String> map) {
        this.stateLock.lock();
        try {
            try {
                if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                    LOG.info("Connector has already been started");
                    this.stateLock.unlock();
                    return;
                }
                LOG.info("Starting FilePulse source task");
                this.taskConfig = new SourceTaskConfig(new HashMap(map));
                this.connectorGroupName = map.get(CONNECT_NAME_CONFIG);
                this.offsetPolicy = this.taskConfig.getSourceOffsetPolicy();
                this.defaultTopic = this.taskConfig.topic();
                this.valueSchemas.put(this.defaultTopic, this.taskConfig.getValueConnectSchema());
                String str = this.connectorGroupName;
                SourceTaskConfig sourceTaskConfig = this.taskConfig;
                Objects.requireNonNull(sourceTaskConfig);
                this.sharedStore = new StateBackingStoreAccess(str, sourceTaskConfig::getStateBackingStore, true);
                this.reporter = new FileObjectStateReporter(this.sharedStore.get().getResource()) { // from class: io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.1
                    @Override // io.streamthoughts.kafka.connect.filepulse.source.FileObjectStateReporter
                    public void onCompleted(FileObjectContext fileObjectContext) {
                        super.onCompleted(fileObjectContext);
                        FilePulseSourceTask.this.completedToCommit.add(fileObjectContext);
                    }
                };
                this.consumer = newFileRecordsPollingConsumer();
                this.consumer.setStateListener(this.reporter);
                this.fileURIProvider = this.taskConfig.getFileURIProvider();
                this.taskThreadId.set(Thread.currentThread().getId());
                LOG.info("Started FilePulse source task");
                this.stateLock.unlock();
            } catch (Throwable th) {
                closeResources();
                throw th;
            }
        } catch (Throwable th2) {
            this.stateLock.unlock();
            throw th2;
        }
    }

    private DefaultFileRecordsPollingConsumer newFileRecordsPollingConsumer() {
        return new DefaultFileRecordsPollingConsumer(this.context, this.taskConfig.reader(), new DefaultRecordFilterPipeline(this.taskConfig.filters()), this.offsetPolicy, this.taskConfig.isReadCommittedFile());
    }

    /* JADX WARN: Removed duplicated region for block: B:30:0x012e  */
    /* JADX WARN: Removed duplicated region for block: B:32:0x0131 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public java.util.List<org.apache.kafka.connect.source.SourceRecord> poll() throws java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 356
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll():java.util.List");
    }

    private boolean isTaskRunning() {
        return this.state.get() == State.RUNNING;
    }

    private void busyWait() throws InterruptedException {
        LOG.trace("Waiting {} ms to execute next poll iteration", Long.valueOf(this.taskConfig.getTaskEmptyPollWaitMs()));
        Thread.sleep(this.taskConfig.getTaskEmptyPollWaitMs());
    }

    public void commit() {
        if (!this.stateLock.tryLock()) {
            LOG.warn("Couldn't commit due to a concurrent connector shutdown or restart");
            return;
        }
        try {
            if (isTaskRunning() && this.contextToBeCommitted != null) {
                this.reporter.notify(this.contextToBeCommitted, FileObjectStatus.READING);
            }
            if (!this.isResourceClosed.get()) {
                while (!this.completedToCommit.isEmpty()) {
                    FileObjectContext poll = this.completedToCommit.poll();
                    LOG.info("Committed offset for file: {}", poll.metadata());
                    safelyCommit(poll);
                }
                if (this.isIdle.get()) {
                    closeResources();
                }
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    private void safelyCommit(FileObjectContext fileObjectContext) {
        try {
            this.reporter.notify(fileObjectContext, FileObjectStatus.COMMITTED);
        } catch (Exception e) {
            LOG.warn("Failed to notify committed file: {}", this.context, e);
        }
    }

    private SourceRecord buildSourceRecord(FileObjectContext fileObjectContext, FileRecord<?> fileRecord) {
        Map partitionMap = this.offsetPolicy.toPartitionMap(fileObjectContext.metadata());
        Map offsetMap = this.offsetPolicy.toOffsetMap(fileRecord.offset().toSourceOffset());
        String valueSchemaConditionTopicPattern = this.taskConfig.getValueSchemaConditionTopicPattern();
        try {
            FileObjectMeta metadata = fileObjectContext.metadata();
            String str = this.defaultTopic;
            Map<String, Schema> map = this.valueSchemas;
            Objects.requireNonNull(map);
            SourceRecord sourceRecord = fileRecord.toSourceRecord(partitionMap, offsetMap, metadata, str, (Integer) null, (v1) -> {
                return r6.get(v1);
            }, new FileRecord.ConnectSchemaMapperOptions(this.taskConfig.isValueConnectSchemaMergeEnabled(), this.taskConfig.isSchemaKeepLeadingUnderscoreOnFieldName(), (Pattern) Optional.ofNullable(valueSchemaConditionTopicPattern).map(Pattern::compile).orElse(null)));
            if (this.taskConfig.isValueConnectSchemaMergeEnabled()) {
                this.valueSchemas.put(sourceRecord.topic(), sourceRecord.valueSchema());
            }
            return sourceRecord;
        } catch (Throwable th) {
            Exception connectFilePulseException = new ConnectFilePulseException(String.format("Failed to convert data into Kafka Connect record at offset %s from object-file: %s'", fileObjectContext.offset(), fileObjectContext.metadata()), th);
            this.consumer.closeCurrentIterator(connectFilePulseException);
            throw connectFilePulseException;
        }
    }

    public void stop() {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.RUNNING, State.STOPPED)) {
                LOG.info("Task has already been stopped");
            } else {
                LOG.info("Stopping FilePulse source task");
                doStop();
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    private void doStop() {
        if (this.taskThreadId.longValue() == Thread.currentThread().getId()) {
            closeResources();
            LOG.info("Stopped FilePulse source task.");
        }
    }

    private void closeResources() {
        if (!this.isResourceClosed.compareAndSet(false, true)) {
            LOG.info("Task's resources have already been closed");
            return;
        }
        LOG.info("Closing resources FilePulse source task");
        try {
            if (this.consumer != null) {
                try {
                    this.consumer.close();
                } catch (Throwable th) {
                    LOG.warn("Failed to close FileRecordsPollingConsumer. Error: {}", th.getMessage());
                }
            }
            if (this.fileURIProvider != null) {
                try {
                    this.fileURIProvider.close();
                } catch (Exception e) {
                    LOG.warn("Failed to close FileURIProvider. Error: {}", e.getMessage());
                }
            }
        } finally {
            this.contextToBeCommitted = null;
            this.consumer = null;
            this.reporter = null;
            closeSharedStateBackingStore();
            LOG.info("Closed resources FilePulse source task");
        }
    }

    private void closeSharedStateBackingStore() {
        try {
            if (this.sharedStore != null) {
                this.sharedStore.close();
            }
        } catch (Exception e) {
            LOG.error("Failed to shared StateBackingStore '{}'", this.connectorGroupName);
        }
    }
}
