package org.apache.nifi.minifi.bootstrap.configuration.ingestors;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchService;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
import org.apache.nifi.minifi.commons.api.MiNiFiProperties;
import org.apache.nifi.minifi.properties.BootstrapProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.class */
public class FileChangeIngestor implements Runnable, ChangeIngestor {
    static final String CONFIG_FILE_BASE_KEY = "nifi.minifi.notifier.ingestors.file";
    static final String CONFIG_FILE_PATH_KEY = "nifi.minifi.notifier.ingestors.file.config.path";
    static final String POLLING_PERIOD_INTERVAL_KEY = "nifi.minifi.notifier.ingestors.file.polling.period.seconds";
    static final int DEFAULT_POLLING_PERIOD_INTERVAL = 15;
    private static final String DIFFERENTIATOR_KEY = "nifi.minifi.notifier.ingestors.file.differentiator";
    private volatile Differentiator<ByteBuffer> differentiator;
    private volatile ConfigurationChangeNotifier configurationChangeNotifier;
    private ScheduledExecutorService executorService;
    private Path configFilePath;
    private WatchService watchService;
    private long pollingSeconds;
    private static final Map<String, Supplier<Differentiator<ByteBuffer>>> DIFFERENTIATOR_CONSTRUCTOR_MAP = Map.of(WholeConfigDifferentiator.WHOLE_CONFIG_KEY, WholeConfigDifferentiator::getByteBufferDifferentiator);
    private static final Logger logger = LoggerFactory.getLogger(FileChangeIngestor.class);
    private static final TimeUnit DEFAULT_POLLING_PERIOD_UNIT = TimeUnit.SECONDS;

    @Override // org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor
    public void initialize(BootstrapProperties bootstrapProperties, ConfigurationFileHolder configurationFileHolder, ConfigurationChangeNotifier configurationChangeNotifier) {
        Path path = (Path) Optional.ofNullable(bootstrapProperties.getProperty(CONFIG_FILE_PATH_KEY)).filter(Predicate.not((v0) -> {
            return v0.isBlank();
        })).map(str -> {
            return Path.of(str, new String[0]);
        }).map((v0) -> {
            return v0.toAbsolutePath();
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Property, nifi.minifi.notifier.ingestors.file.config.path, for the path of the config file must be specified");
        });
        try {
            this.configurationChangeNotifier = configurationChangeNotifier;
            this.configFilePath = path;
            this.pollingSeconds = ((Long) Optional.ofNullable(bootstrapProperties.getProperty(POLLING_PERIOD_INTERVAL_KEY, Long.toString(15L))).map(Long::parseLong).filter(l -> {
                return l.longValue() > 0;
            }).map(l2 -> {
                return Long.valueOf(TimeUnit.SECONDS.convert(l2.longValue(), DEFAULT_POLLING_PERIOD_UNIT));
            }).orElseThrow(() -> {
                return new IllegalArgumentException("Cannot specify a polling period with duration <=0");
            })).longValue();
            this.watchService = initializeWatcher(path);
            this.differentiator = (Differentiator) Optional.ofNullable(bootstrapProperties.getProperty(DIFFERENTIATOR_KEY)).filter(Predicate.not((v0) -> {
                return v0.isBlank();
            })).map(str2 -> {
                return (Differentiator) Optional.ofNullable(DIFFERENTIATOR_CONSTRUCTOR_MAP.get(str2)).map((v0) -> {
                    return v0.get();
                }).orElseThrow(unableToFindDifferentiatorExceptionSupplier(str2));
            }).orElseGet(WholeConfigDifferentiator::getByteBufferDifferentiator);
            this.differentiator.initialize(configurationFileHolder);
            checkConfigFileLocationCorrectness(bootstrapProperties, path);
        } catch (Exception e) {
            throw new IllegalStateException("Could not successfully initialize file change notifier", e);
        }
    }

    @Override // org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor
    public void start() {
        this.executorService = Executors.newScheduledThreadPool(1, runnable -> {
            Thread newThread = Executors.defaultThreadFactory().newThread(runnable);
            newThread.setName("File Change Notifier Thread");
            newThread.setDaemon(true);
            return newThread;
        });
        this.executorService.scheduleWithFixedDelay(this, 0L, this.pollingSeconds, DEFAULT_POLLING_PERIOD_UNIT);
    }

    @Override // java.lang.Runnable
    public void run() {
        logger.debug("Checking for a change in {}", this.configFilePath);
        if (!targetFileChanged()) {
            logger.debug("No change detected in {}", this.configFilePath);
            return;
        }
        logger.debug("Target file changed, checking if it's different than current flow");
        try {
            FileInputStream fileInputStream = new FileInputStream(this.configFilePath.toFile());
            try {
                ByteBuffer wrap = ByteBuffer.wrap(IOUtils.toByteArray(fileInputStream));
                if (this.differentiator.isNew(wrap)) {
                    logger.debug("Current flow and new flow is different, notifying listener");
                    this.configurationChangeNotifier.notifyListeners(wrap);
                    logger.debug("Listeners have been notified");
                }
                fileInputStream.close();
            } finally {
            }
        } catch (Exception e) {
            logger.error("Could not successfully notify listeners.", e);
        }
    }

    @Override // org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor
    public void close() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    boolean targetFileChanged() {
        logger.trace("Attempting to acquire watch key");
        Optional ofNullable = Optional.ofNullable(this.watchService.poll());
        logger.trace("Watch key acquire with value {}", ofNullable);
        boolean anyMatch = ((List) ofNullable.map((v0) -> {
            return v0.pollEvents();
        }).orElse(Collections.emptyList())).stream().anyMatch(watchEvent -> {
            return StandardWatchEventKinds.ENTRY_MODIFY == watchEvent.kind() && ((Path) watchEvent.context()).equals(this.configFilePath.getName(this.configFilePath.getNameCount() - 1));
        });
        logger.debug("Target file changed: {}", Boolean.valueOf(anyMatch));
        ofNullable.map((v0) -> {
            return v0.reset();
        }).filter(bool -> {
            return !bool.booleanValue();
        }).ifPresent(bool2 -> {
            logger.error("Unable to reinitialize file system watcher.");
            throw new IllegalStateException("Unable to reinitialize file system watcher.");
        });
        logger.trace("Watch key has been reset successfully");
        return anyMatch;
    }

    private WatchService initializeWatcher(Path path) {
        try {
            WatchService newWatchService = FileSystems.getDefault().newWatchService();
            Path parent = path.getParent();
            parent.register(newWatchService, StandardWatchEventKinds.ENTRY_MODIFY);
            logger.trace("Watch service registered for {}", parent);
            return newWatchService;
        } catch (IOException e) {
            throw new IllegalStateException("Unable to initialize a file system watcher for the path " + String.valueOf(path), e);
        }
    }

    private Supplier<IllegalArgumentException> unableToFindDifferentiatorExceptionSupplier(String str) {
        return () -> {
            return new IllegalArgumentException("Property, nifi.minifi.notifier.ingestors.file.differentiator, has value " + str + " which does not correspond to any in the FileChangeIngestor Map:" + String.valueOf(DIFFERENTIATOR_CONSTRUCTOR_MAP.keySet()));
        };
    }

    private void checkConfigFileLocationCorrectness(BootstrapProperties bootstrapProperties, Path path) {
        Path absolutePath = Path.of(bootstrapProperties.getProperty(MiNiFiProperties.NIFI_MINIFI_FLOW_CONFIG.getKey()), new String[0]).toAbsolutePath();
        Path resolve = absolutePath.getParent().resolve(FilenameUtils.getBaseName(absolutePath.toString()) + ".raw");
        if (absolutePath.equals(path) || resolve.equals(path)) {
            throw new IllegalStateException("File ingestor config file (nifi.minifi.notifier.ingestors.file.config.path) must point to a different file than MiNiFi flow config file and raw flow config file");
        }
    }

    void setConfigFilePath(Path path) {
        this.configFilePath = path;
    }

    void setWatchService(WatchService watchService) {
        this.watchService = watchService;
    }

    void setConfigurationChangeNotifier(ConfigurationChangeNotifier configurationChangeNotifier) {
        this.configurationChangeNotifier = configurationChangeNotifier;
    }

    void setDifferentiator(Differentiator<ByteBuffer> differentiator) {
        this.differentiator = differentiator;
    }
}
