package com.microsoft.azure.cosmos.connectors.cassandra.filewatcher;

import com.google.common.base.Preconditions;
import com.microsoft.azure.cosmos.connectors.cassandra.config.CommitLogUploadConfig;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.ContainerLocation;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.UploadFileSet;
import com.microsoft.azure.cosmos.connectors.cassandra.uploadagent.storeprovider.StoreProvider;
import java.io.File;
import java.io.FileFilter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:com/microsoft/azure/cosmos/connectors/cassandra/filewatcher/CDCUploadManager.class */
public class CDCUploadManager {
    private final String nodeName;
    private final StoreProvider storeProvider;
    private final Path cdcPath;
    private final ContainerLocation cdcFileLocation;
    private final int maxConcurrentUpload;
    private final ExecutorService executorService;
    private final long fileWatcherSleepTimeInMillis;
    private final FileProcessorUtils fileProcessorUtils;
    private final CommitLogUploadConfig commitLogUploadConfig;
    private final String commitlogDir;
    private CDCFileProcessor fileProcessor;
    private FileWatcher watcher;
    private ActiveCommitLogProcessor activeCommitlogProcessor;
    private CDCFileProcessor cdcFileProcessor;
    private CDCFileProcessor partialSegmentsProcessor;
    private FileWatcher partialSegmentFileWatcher;
    private boolean enableVerboseMetricsLogging;

    public CDCUploadManager(FileProcessorUtils fileProcessorUtils, String str, StoreProvider storeProvider, ContainerLocation containerLocation, String str2, int i, ExecutorService executorService, CommitLogUploadConfig commitLogUploadConfig, String str3, long j, boolean z) {
        Preconditions.checkNotNull(fileProcessorUtils, "fileProcessorUtils");
        Preconditions.checkArgument(StringUtils.isNotEmpty(str), "nodeName");
        Preconditions.checkNotNull(storeProvider, "storeProvider");
        Preconditions.checkNotNull(containerLocation, "cdcFileLocation");
        Preconditions.checkNotNull(Integer.valueOf(i), "maxConcurrentUpload");
        Preconditions.checkNotNull(executorService, "executorService");
        Preconditions.checkNotNull(commitLogUploadConfig, "commitLogUploadConfig");
        Preconditions.checkArgument(j > 0, "fileWatcherSleepTimeInMillis");
        this.fileProcessorUtils = fileProcessorUtils;
        this.nodeName = str;
        this.storeProvider = storeProvider;
        this.cdcFileLocation = containerLocation;
        this.cdcPath = Paths.get(str2, new String[0]);
        this.maxConcurrentUpload = i;
        this.executorService = executorService;
        this.fileWatcherSleepTimeInMillis = j;
        this.commitLogUploadConfig = commitLogUploadConfig;
        this.commitlogDir = str3;
        this.enableVerboseMetricsLogging = z;
    }

    public void start() throws Exception {
        CommitLogSegmentOffsetCache commitLogSegmentOffsetCache = null;
        Semaphore semaphore = new Semaphore(this.maxConcurrentUpload);
        if (this.commitLogUploadConfig.isEnableOffsetProcessing()) {
            Files.createDirectories(Paths.get(this.commitLogUploadConfig.getCommitLogSegmentProcessorWriteDir(), new String[0]), new FileAttribute[0]);
            Files.createDirectories(Paths.get(this.commitLogUploadConfig.getTmpCommitLogSegmentProcessorWriteDir(), new String[0]), new FileAttribute[0]);
            commitLogSegmentOffsetCache = new CommitLogSegmentOffsetCache();
            this.activeCommitlogProcessor = new ActiveCommitLogProcessor(this.commitlogDir, this.commitLogUploadConfig, commitLogSegmentOffsetCache, this.enableVerboseMetricsLogging);
            this.activeCommitlogProcessor.start();
            this.partialSegmentsProcessor = new CDCFileProcessor(this.fileProcessorUtils, this.nodeName, this.storeProvider, this.cdcFileLocation, null, this.commitLogUploadConfig, this.enableVerboseMetricsLogging);
            this.partialSegmentFileWatcher = new FileWatcher(Paths.get(this.commitLogUploadConfig.getCommitLogSegmentProcessorWriteDir(), new String[0]), this.partialSegmentsProcessor, cdcFileFilter(), semaphore, this.executorService, this.fileWatcherSleepTimeInMillis, UploadFileSet.Type.CDC, "partialCommitLog", this.enableVerboseMetricsLogging);
            this.partialSegmentFileWatcher.start();
        }
        this.cdcFileProcessor = new CDCFileProcessor(this.fileProcessorUtils, this.nodeName, this.storeProvider, this.cdcFileLocation, commitLogSegmentOffsetCache, this.commitLogUploadConfig, this.enableVerboseMetricsLogging);
        this.watcher = new FileWatcher(this.cdcPath, this.cdcFileProcessor, cdcFileFilter(), semaphore, this.executorService, this.fileWatcherSleepTimeInMillis, UploadFileSet.Type.CDC, this.enableVerboseMetricsLogging);
        this.watcher.start();
    }

    public void shutdown() {
        this.watcher.shutdown();
        if (this.partialSegmentFileWatcher != null) {
            this.partialSegmentFileWatcher.shutdown();
        }
        if (this.activeCommitlogProcessor != null) {
            this.activeCommitlogProcessor.shutdown();
        }
    }

    private static FileFilter cdcFileFilter() {
        return new FileFilter() { // from class: com.microsoft.azure.cosmos.connectors.cassandra.filewatcher.CDCUploadManager.1
            @Override // java.io.FileFilter
            public boolean accept(File file) {
                return file.getName().startsWith("CommitLog") && file.getName().endsWith(".log");
            }
        };
    }
}
