package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.config.CommonSourceConfig;
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.TaskPartitioner;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/fs/DelegateTaskFileURIProvider.class */
public class DelegateTaskFileURIProvider implements TaskFileURIProvider {
    private static final Logger LOG = LoggerFactory.getLogger(DelegateTaskFileURIProvider.class);
    private static final Duration DEFAULT_REFRESH_TIMEOUT = Duration.ofSeconds(30);
    private static final String CONNECT_NAME_CONFIG = "name";
    private Config config;
    private FileSystemListing<?> fileSystemListing;
    private TaskPartitioner partitioner;
    private SourceOffsetPolicy sourceOffsetPolicy;
    private StateBackingStoreAccess sharedStore;
    private TaskFileOrder taskFileOrder;
    private StateSnapshot<FileObject> fileState;
    private boolean isFirstCall = true;

    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/fs/DelegateTaskFileURIProvider$Config.class */
    public static final class Config extends CommonSourceConfig {
        public static final String TASK_ID_CONFIG = "task.id";
        private static final String TASK_ID_DOC = "The current task id";
        public static final String TASK_COUNT_CONFIG = "task.count";
        private static final String TASK_COUNT_DOC = "The total number tasks assigned.";

        public Config(Map<String, ?> map) {
            super(getConf(), map);
        }

        static ConfigDef getConf() {
            return CommonSourceConfig.getConfigDef().define(TASK_ID_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH, TASK_ID_DOC).define(TASK_COUNT_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH, TASK_COUNT_DOC);
        }

        public int getTaskId() {
            return getInt(TASK_ID_CONFIG).intValue();
        }

        public int getTaskCount() {
            return getInt(TASK_COUNT_CONFIG).intValue();
        }
    }

    public void configure(Map<String, ?> map) {
        this.config = new Config(map);
        String obj = map.get(CONNECT_NAME_CONFIG).toString();
        Config config = this.config;
        Objects.requireNonNull(config);
        this.sharedStore = new StateBackingStoreAccess(obj, config::getStateBackingStore, true);
        this.partitioner = this.config.getTaskPartitioner();
        this.fileSystemListing = this.config.getFileSystemListing();
        this.sourceOffsetPolicy = this.config.getSourceOffsetPolicy();
        this.fileSystemListing.setFilter(new CompositeFileListFilter(this.config.getFileSystemListingFilter()));
        this.taskFileOrder = this.config.getTaskFilerOrder();
    }

    public List<URI> nextURIs() {
        refreshState();
        Collection<FileObjectMeta> values = FileObjectCandidatesFilter.filter(this.sourceOffsetPolicy, fileObjectKey -> {
            FileObject fileObject = (FileObject) this.fileState.getForKey(fileObjectKey.original());
            if (fileObject == null) {
                return true;
            }
            FileObjectStatus status = fileObject.status();
            return status == FileObjectStatus.COMPLETED ? this.isFirstCall : !status.isDone();
        }, this.fileSystemListing.listObjects()).values();
        this.isFirstCall = false;
        return this.partitioner.partitionForTask(this.taskFileOrder.sort(values), this.config.getTaskCount(), this.config.getTaskId());
    }

    private void refreshState() {
        try {
            StateBackingStore<FileObject> resource = this.sharedStore.get().getResource();
            resource.refresh(DEFAULT_REFRESH_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            this.fileState = resource.snapshot();
        } catch (TimeoutException e) {
            LOG.debug("Failed to reach end of states log quickly enough", e);
        }
    }

    public boolean hasMore() {
        return true;
    }

    public void close() {
        LOG.info("Closing DelegateTaskFileURIProvider");
        closeSharedStateBackingStore();
        LOG.info("Closed DelegateTaskFileURIProvider");
    }

    private void closeSharedStateBackingStore() {
        try {
            if (this.sharedStore != null) {
                this.sharedStore.close();
            }
        } catch (Exception e) {
            LOG.warn("Failed to close shared StateBackingStore. Error: '{}'", e.getMessage());
        }
    }
}
