package com.microsoft.azure.cosmos.connectors.cassandra.filewatcher;

import com.google.common.base.Preconditions;
import com.microsoft.azure.cosmos.connectors.cassandra.config.Config;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.ContainerLocation;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.UploadFileSet;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.UploadFileType;
import com.microsoft.azure.cosmos.connectors.cassandra.service.DynamicSemaphore;
import com.microsoft.azure.cosmos.connectors.cassandra.uploadagent.storeprovider.StoreProvider;
import java.io.File;
import java.io.FileFilter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:com/microsoft/azure/cosmos/connectors/cassandra/filewatcher/CDCUploadManager.class */
public class CDCUploadManager {
    private final String nodeName;
    private final StoreProvider storeProvider;
    private final ContainerLocation cdcFileLocation;
    private final ExecutorService executorService;
    private final FileProcessorUtils fileProcessorUtils;
    private Config config;
    private FileWatcher watcher;
    private ActiveCommitLogProcessor activeCommitlogProcessor;
    private CDCFileProcessor cdcFileProcessor;
    private CDCFileProcessor partialSegmentsProcessor;
    private FileWatcher partialSegmentFileWatcher;

    public CDCUploadManager(FileProcessorUtils fileProcessorUtils, String str, StoreProvider storeProvider, ContainerLocation containerLocation, Config config, ExecutorService executorService) {
        Preconditions.checkNotNull(fileProcessorUtils, "fileProcessorUtils");
        Preconditions.checkArgument(StringUtils.isNotEmpty(str), "nodeName");
        Preconditions.checkNotNull(storeProvider, "storeProvider");
        Preconditions.checkNotNull(containerLocation, "cdcFileLocation");
        Preconditions.checkNotNull(Integer.valueOf(config.getMaxConcurrentCDCUpload()), "maxConcurrentUpload");
        Preconditions.checkNotNull(executorService, "executorService");
        Preconditions.checkNotNull(config.getCommitLogUploadConfig(), "commitLogUploadConfig");
        Preconditions.checkArgument(config.getFileWatcherSleepTimeInMillis() > 0, "fileWatcherSleepTimeInMillis");
        this.config = config;
        this.fileProcessorUtils = fileProcessorUtils;
        this.nodeName = str;
        this.storeProvider = storeProvider;
        this.cdcFileLocation = containerLocation;
        this.executorService = executorService;
    }

    public void start() throws Exception {
        DynamicSemaphore dynamicSemaphore = new DynamicSemaphore(this.config.getMaxConcurrentCDCUpload());
        Files.createDirectories(Paths.get(this.config.getCommitLogUploadConfig().getActiveCommitLogSegmentProcessorWriteDir(), new String[0]), new FileAttribute[0]);
        Files.createDirectories(Paths.get(this.config.getCommitLogUploadConfig().getTmpCommitLogSegmentProcessorWriteDir(), new String[0]), new FileAttribute[0]);
        Files.createDirectories(Paths.get(this.config.getCommitLogUploadConfig().getArchivedCommitLogSegmentProcessorWriteDir(), new String[0]), new FileAttribute[0]);
        CommitLogSegmentOffsetCache commitLogSegmentOffsetCache = new CommitLogSegmentOffsetCache();
        this.activeCommitlogProcessor = new ActiveCommitLogProcessor(this.config, commitLogSegmentOffsetCache);
        this.activeCommitlogProcessor.start();
        this.partialSegmentsProcessor = new CDCFileProcessor(this.config, this.fileProcessorUtils, this.nodeName, this.storeProvider, this.cdcFileLocation, null, UploadFileSet.Type.CDC, UploadFileType.CDC, true);
        this.partialSegmentFileWatcher = new FileWatcher(this.config, this.partialSegmentsProcessor, dynamicSemaphore, this.executorService, UploadFileSet.Type.CDC, "partialCommitLog");
        this.partialSegmentFileWatcher.start();
        this.cdcFileProcessor = new CDCFileProcessor(this.config, this.fileProcessorUtils, this.nodeName, this.storeProvider, this.cdcFileLocation, commitLogSegmentOffsetCache, UploadFileSet.Type.CDC, UploadFileType.CDC_ARCHIVE, false);
        this.watcher = new FileWatcher(this.config, this.cdcFileProcessor, dynamicSemaphore, this.executorService, UploadFileSet.Type.CDC);
        this.watcher.start();
    }

    public void shutdown() {
        this.watcher.shutdown();
        if (this.partialSegmentFileWatcher != null) {
            this.partialSegmentFileWatcher.shutdown();
        }
        if (this.activeCommitlogProcessor != null) {
            this.activeCommitlogProcessor.shutdown();
        }
    }

    private static FileFilter cdcFileFilter() {
        return new FileFilter() { // from class: com.microsoft.azure.cosmos.connectors.cassandra.filewatcher.CDCUploadManager.1
            @Override // java.io.FileFilter
            public boolean accept(File file) {
                return file.getName().startsWith("CommitLog") && file.getName().endsWith(".log");
            }
        };
    }
}
