package com.microsoft.azure.cosmos.connectors.cassandra.filewatcher;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.microsoft.azure.cosmos.connectors.cassandra.commitlogprocessor.v6.CommitLogSegmentProcessor;
import com.microsoft.azure.cosmos.connectors.cassandra.config.CommitLogUploadConfig;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.ContainerLocation;
import com.microsoft.azure.cosmos.connectors.cassandra.datamodel.UploadFileSet;
import com.microsoft.azure.cosmos.connectors.cassandra.uploadagent.storeprovider.StoreProvider;
import java.io.File;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/cosmos/connectors/cassandra/filewatcher/CDCFileProcessor.class */
public class CDCFileProcessor extends FileProcessor {
    private final Logger logger;
    private final CommitLogSegmentOffsetCache commitlogSegmentOffsetCache;
    private final CommitLogUploadConfig commitLogUploadConfig;
    private final CommitLogSegmentProcessor commitLogSegmentProcessor;
    private boolean enableVerboseMetricsLogging;

    public CDCFileProcessor(FileProcessorUtils fileProcessorUtils, String str, StoreProvider storeProvider, ContainerLocation containerLocation, CommitLogSegmentOffsetCache commitLogSegmentOffsetCache, CommitLogUploadConfig commitLogUploadConfig, boolean z) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException, MalformedObjectNameException {
        super(fileProcessorUtils, str, UploadFileSet.Type.CDC, storeProvider, containerLocation);
        this.logger = LoggerFactory.getLogger((Class<?>) CDCFileProcessor.class);
        this.commitlogSegmentOffsetCache = commitLogSegmentOffsetCache;
        if (commitLogSegmentOffsetCache != null) {
            Preconditions.checkArgument(StringUtils.isNotEmpty(commitLogUploadConfig.getCommitLogSegmentProcessorWriteDir()), "commitLogSegmentProcessorWriteDir");
            Preconditions.checkArgument(StringUtils.isNotEmpty(commitLogUploadConfig.getTmpCommitLogSegmentProcessorWriteDir()), "tmpCommitLogSegmentProcessorWriteDir");
            Preconditions.checkArgument(commitLogUploadConfig.isEnableOffsetProcessing(), "enableOffsetProcessing");
            this.commitLogSegmentProcessor = new CommitLogSegmentProcessor(commitLogUploadConfig.getTmpCommitLogSegmentProcessorWriteDir(), z);
        } else {
            this.commitLogSegmentProcessor = null;
        }
        this.commitLogUploadConfig = commitLogUploadConfig;
    }

    @Override // com.microsoft.azure.cosmos.connectors.cassandra.filewatcher.FileProcessor
    protected List<Path> getFileSet(Path path) {
        return ImmutableList.of(path);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.microsoft.azure.cosmos.connectors.cassandra.filewatcher.FileProcessor
    public void processFileInternal(Path path) throws Exception {
        if (this.commitLogSegmentProcessor == null) {
            this.logger.info("uploading {} as whole", path);
            super.processFileInternal(path);
            return;
        }
        Integer andRemove = this.commitlogSegmentOffsetCache.getAndRemove(path.getFileName().toString());
        if (andRemove == null || andRemove.intValue() == 0) {
            this.logger.info("uploading {} as whole, no offset found", path);
            super.processFileInternal(path);
            return;
        }
        try {
            this.logger.info("processing {} from offset {}", path, andRemove);
            File right = this.commitLogSegmentProcessor.processSegment(path.toFile(), andRemove.intValue()).getRight();
            if (right != null) {
                Path path2 = right.toPath();
                Path path3 = Paths.get(this.commitLogUploadConfig.getCommitLogSegmentProcessorWriteDir(), right.getName());
                this.logger.info("Moving commit log Segment from [{}] to [{}]", path2, path3);
                Files.move(path2, path3, new CopyOption[0]);
            }
            super.delete(path);
            this.logger.info("Deleted [{}]", path);
        } catch (Exception e) {
            this.logger.error("processing segment {} from offset {}, failed. Will ignore offset and process as whole", path.toString(), andRemove, e);
            super.processFileInternal(path);
        }
    }
}
