package com.microsoft.azure.cosmos.connectors.cassandra.filewatcher;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.microsoft.azure.cosmos.connectors.cassandra.config.Config;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.MetricLog;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.UploadFileSet;
import com.microsoft.azure.cosmos.connectors.cassandra.perf.Constants;
import com.microsoft.azure.cosmos.connectors.cassandra.perf.ICounter;
import com.microsoft.azure.cosmos.connectors.cassandra.perf.IDistributionSummary;
import com.microsoft.azure.cosmos.connectors.cassandra.perf.IMeterRegistry;
import com.microsoft.azure.cosmos.connectors.cassandra.perf.ITimedActivity;
import com.microsoft.azure.cosmos.connectors.cassandra.perf.MeterRegistryProvider;
import com.microsoft.azure.cosmos.connectors.cassandra.service.DynamicSemaphore;
import com.microsoft.azure.cosmos.connectors.cassandra.service.ScheduledExecutorServiceWithDynamicDelay;
import com.microsoft.azure.cosmos.connectors.cassandra.service.ScheduledTask;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
import javax.management.InvalidAttributeValueException;
import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ReflectionException;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/cosmos/connectors/cassandra/filewatcher/FileWatcher.class */
public class FileWatcher extends ScheduledTask {
    private static final Logger metricLogger = LoggerFactory.getLogger(Constants.AGENT_METRIC_LOGGER);
    private final Logger logger;
    private final Config config;
    private final IFileProcessor fileProcessor;
    private final Set<String> inProgressWork;
    private final DynamicSemaphore concurrentUploadSem;
    private final ExecutorService executorService;
    private final ICounter collectedFilesDistributionCounter;
    private final ICounter processedFilesSuccessCounter;
    private final ICounter processedFilesFailureCounter;
    private final IDistributionSummary collectedFilesDistributionSummary;
    private final IDistributionSummary processedFilesSuccessSummary;
    private final IDistributionSummary processedFilesFailureSummary;
    private final ITimedActivity processedFilesTimer;
    private final String meterName;
    private volatile boolean terminated;

    public FileWatcher(Config config, IFileProcessor iFileProcessor, DynamicSemaphore dynamicSemaphore, ExecutorService executorService, UploadFileSet.Type type) throws InstanceAlreadyExistsException, NotCompliantMBeanException, MalformedObjectNameException, ReflectionException, MBeanException, IOException {
        this(config, iFileProcessor, dynamicSemaphore, executorService, type, type.getName());
    }

    public FileWatcher(Config config, IFileProcessor iFileProcessor, DynamicSemaphore dynamicSemaphore, ExecutorService executorService, UploadFileSet.Type type, String str) throws InstanceAlreadyExistsException, NotCompliantMBeanException, MalformedObjectNameException, ReflectionException, MBeanException, IOException {
        this.inProgressWork = Sets.newConcurrentHashSet();
        this.logger = LoggerFactory.getLogger(String.join(":", FileWatcher.class.getCanonicalName(), type.getName()));
        this.config = config;
        this.fileProcessor = iFileProcessor;
        this.terminated = false;
        this.concurrentUploadSem = dynamicSemaphore;
        this.executorService = executorService;
        IMeterRegistry meterRegistryProvider = MeterRegistryProvider.getInstance();
        this.meterName = str;
        this.collectedFilesDistributionCounter = meterRegistryProvider.getCounter(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_COUNTER, str));
        this.processedFilesSuccessCounter = meterRegistryProvider.getCounter(String.format(Constants.FILE_WATCHER_PROCESS_SUCCESS_COUNTER, str));
        this.processedFilesFailureCounter = meterRegistryProvider.getCounter(String.format(Constants.FILE_WATCHER_PROCESS_FAILURE_COUNTER, str));
        this.collectedFilesDistributionSummary = meterRegistryProvider.getSummary(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_SUMMARY, str));
        this.processedFilesSuccessSummary = meterRegistryProvider.getSummary(String.format(Constants.FILE_WATCHER_PROCESS_SUCCESS_SUMMARY, str));
        this.processedFilesFailureSummary = meterRegistryProvider.getSummary(String.format(Constants.FILE_WATCHER_PROCESS_FAILURE_SUMMARY, str));
        this.processedFilesTimer = meterRegistryProvider.getTimer(String.format(Constants.FILE_WATCHER_PROCESS_TIME, str));
    }

    @Override // com.microsoft.azure.cosmos.connectors.cassandra.service.ScheduledTask, com.microsoft.azure.cosmos.connectors.cassandra.service.IScheduledTask
    public boolean IsTaskEnabled() {
        return this.fileProcessor.ShouldProcessFile();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.concurrentUploadSem.setNumberOfPermits(this.config.getMaxConcurrentCDCUpload());
            Iterator<Path> it = this.fileProcessor.getWatchPath().iterator();
            while (it.hasNext()) {
                walkDirectoryTree(it.next());
            }
            this.logger.info("Terminated");
        } catch (Throwable th) {
            this.logger.error("Unexpected failure", th);
        }
    }

    private boolean isInProgress(Path path) {
        return this.inProgressWork.contains(path.toString());
    }

    private final List<File> collectFiles(Path path) {
        try {
            Stream<Path> walk = Files.walk(path, new FileVisitOption[0]);
            Throwable th = null;
            try {
                try {
                    List<File> list = (List) walk.filter(path2 -> {
                        try {
                            if (this.fileProcessor.FileUploadFilter().accept(path2.toFile()) && !isInProgress(path2)) {
                                if (Files.isRegularFile(path2, new LinkOption[0])) {
                                    return true;
                                }
                            }
                            return false;
                        } catch (Exception e) {
                            this.logger.error("Unexpected failure, ignoring this item {}", path2, e);
                            return false;
                        }
                    }).map(path3 -> {
                        return path3.toFile();
                    }).collect(Collectors.toList());
                    list.sort(LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
                    if (walk != null) {
                        if (0 != 0) {
                            try {
                                walk.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            walk.close();
                        }
                    }
                    return list;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            this.logger.error("Failed to walk on the watchPath {}", this.fileProcessor.getWatchPath(), e);
            return new ArrayList();
        }
    }

    private void walkDirectoryTree(Path path) {
        while (!this.terminated) {
            try {
                this.logger.debug("scan ...");
                List<File> collectFiles = collectFiles(path);
                this.collectedFilesDistributionSummary.record(collectFiles.size());
                this.collectedFilesDistributionCounter.increment(collectFiles.size());
                com.microsoft.azure.cosmos.connectors.cassandra.service.Utils.updateMBean(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_COUNTER_MBEAN, this.meterName), collectFiles.size(), this.config.getJolokiaConfig());
                metricLogger.info("{}", new MetricLog(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_COUNTER, this.meterName), Double.valueOf(collectFiles.size())).getStringValue());
                if (this.config.isEnableVerboseMetricsLogging()) {
                    this.logger.debug(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_COUNTER, this.meterName) + " incremented by {} files", Integer.valueOf(collectFiles.size()));
                    this.logger.debug(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_SUMMARY, this.meterName) + " recorded {} files", Integer.valueOf(collectFiles.size()));
                    this.logger.debug(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_COUNTER_MBEAN, this.meterName) + " incremented by {} files", Integer.valueOf(collectFiles.size()));
                    String join = StringUtils.join((Iterable) collectFiles.stream().map(file -> {
                        return file.getAbsolutePath();
                    }).collect(Collectors.toList()), ", ");
                    this.logger.debug(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_COUNTER, this.meterName) + " incremented by following files: [{}]", join);
                    this.logger.debug(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_SUMMARY, this.meterName) + " recorded files: [{}]", join);
                    this.logger.debug(String.format(Constants.FILE_WATCHER_COLLECTED_FILES_COUNTER_MBEAN, this.meterName) + " recorded files: [{}]", join);
                }
                if (collectFiles.isEmpty()) {
                    this.logger.debug("Found {} new files in {} and has {} pending uploads.", new Object[]{Integer.valueOf(collectFiles.size()), this.fileProcessor.getWatchPath(), Integer.valueOf(this.inProgressWork.size())});
                } else {
                    this.logger.info("Found {} new files in {} and has {} pending uploads.", new Object[]{Integer.valueOf(collectFiles.size()), this.fileProcessor.getWatchPath(), Integer.valueOf(this.inProgressWork.size())});
                }
                if (collectFiles.size() == 0) {
                    this.logger.debug("No matching file sleeping.");
                    TimeUnit.MILLISECONDS.sleep(this.fileProcessor.GetFileProcessorSleepTimeInMillis());
                } else {
                    Iterator<File> it = collectFiles.iterator();
                    while (it.hasNext()) {
                        processFile(it.next().toPath());
                    }
                }
            } catch (InterruptedException e) {
                this.logger.info("interrupted exception", e);
                this.terminated = true;
                return;
            } catch (Exception e2) {
                this.logger.error("unexpected failure", e2);
            }
        }
    }

    public void shutdown() {
        this.terminated = true;
    }

    private void processFile(Path path) throws InterruptedException {
        String path2 = path.toString();
        if (this.inProgressWork.contains(path2)) {
            return;
        }
        this.logger.info("trying to acquire upload sem ...");
        this.concurrentUploadSem.acquire();
        this.logger.info("semaphore acquired.");
        if (this.terminated) {
            this.concurrentUploadSem.release();
            return;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        CompletableFuture processFile = this.fileProcessor.processFile(path);
        this.inProgressWork.add(path2);
        processFile.whenComplete((r14, th) -> {
            this.logger.info("completed processing {}.", path2);
            this.inProgressWork.remove(path2);
            this.concurrentUploadSem.release();
            createStarted.stop();
            Duration elapsed = createStarted.elapsed();
            this.processedFilesTimer.record(elapsed);
            try {
                com.microsoft.azure.cosmos.connectors.cassandra.service.Utils.updateMBean(String.format(Constants.FILE_WATCHER_PROCESS_TIME_MBEAN, this.meterName), elapsed.toNanos(), this.config.getJolokiaConfig());
                metricLogger.info("{}", new MetricLog(String.format(Constants.FILE_WATCHER_PROCESS_TIME, this.meterName), Double.valueOf(elapsed.toNanos())).getStringValue());
            } catch (InstanceNotFoundException | InvalidAttributeValueException | AttributeNotFoundException | MalformedObjectNameException | MBeanException | ReflectionException | IOException e) {
                this.logger.error("updating MBean failed!", th);
            }
            if (this.config.isEnableVerboseMetricsLogging()) {
                this.logger.debug(String.format(Constants.FILE_WATCHER_PROCESS_TIME, this.meterName) + " recorded {} nano seconds", Integer.valueOf(elapsed.getNano()));
                this.logger.debug(String.format(Constants.FILE_WATCHER_PROCESS_TIME_MBEAN, this.meterName) + " recorded {} nano seconds", Integer.valueOf(elapsed.getNano()));
            }
            if (th == null) {
                this.processedFilesSuccessCounter.increment();
                this.processedFilesSuccessSummary.record(1.0d);
                try {
                    com.microsoft.azure.cosmos.connectors.cassandra.service.Utils.updateMBean(String.format(Constants.FILE_WATCHER_PROCESS_SUCCESS_COUNTER_MBEAN, this.meterName), this.config.getJolokiaConfig());
                    metricLogger.info("{}", new MetricLog(String.format(Constants.FILE_WATCHER_PROCESS_SUCCESS_COUNTER, this.meterName), Double.valueOf(1.0d)).getStringValue());
                } catch (InstanceNotFoundException | InvalidAttributeValueException | AttributeNotFoundException | MalformedObjectNameException | MBeanException | ReflectionException | IOException e2) {
                    this.logger.error("updating MBean failed!", th);
                }
                if (this.config.isEnableVerboseMetricsLogging()) {
                    this.logger.debug(String.format(Constants.FILE_WATCHER_PROCESS_SUCCESS_COUNTER, this.meterName) + " incremented");
                    this.logger.debug(String.format(Constants.FILE_WATCHER_PROCESS_SUCCESS_SUMMARY, this.meterName) + " recorded 1 event");
                    this.logger.debug(String.format(Constants.FILE_WATCHER_PROCESS_SUCCESS_COUNTER_MBEAN, this.meterName) + " incremented");
                    return;
                }
                return;
            }
            this.logger.error("Encountered failure in processing {}.", path, th);
            this.processedFilesFailureCounter.increment();
            this.processedFilesFailureSummary.record(1.0d);
            try {
                com.microsoft.azure.cosmos.connectors.cassandra.service.Utils.updateMBean(String.format(Constants.FILE_WATCHER_PROCESS_FAILURE_COUNTER_MBEAN, this.meterName), this.config.getJolokiaConfig());
                metricLogger.info("{}", new MetricLog(String.format(Constants.FILE_WATCHER_PROCESS_FAILURE_COUNTER, this.meterName), Double.valueOf(1.0d)).getStringValue());
            } catch (InstanceNotFoundException | InvalidAttributeValueException | AttributeNotFoundException | MalformedObjectNameException | MBeanException | ReflectionException | IOException e3) {
                this.logger.error("updating MBean failed!", th);
            }
            if (this.config.isEnableVerboseMetricsLogging()) {
                this.logger.debug(String.format(Constants.FILE_WATCHER_PROCESS_FAILURE_COUNTER, this.meterName) + " incremented");
                this.logger.debug(String.format(Constants.FILE_WATCHER_PROCESS_FAILURE_SUMMARY, this.meterName) + " recorded 1 event");
                this.logger.debug(String.format(Constants.FILE_WATCHER_PROCESS_FAILURE_COUNTER_MBEAN, this.meterName) + " incremented");
            }
        });
    }

    public void start() {
        this.executorService.submit(new ScheduledExecutorServiceWithDynamicDelay(this, this.executorService));
    }
}
