package org.apache.hudi.timeline.service.handlers.marker;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.timeline.service.handlers.MarkerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hudi/timeline/service/handlers/marker/MarkerBasedEarlyConflictDetectionRunnable.class */
public class MarkerBasedEarlyConflictDetectionRunnable implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MarkerBasedEarlyConflictDetectionRunnable.class);
    private MarkerHandler markerHandler;
    private String markerDir;
    private String basePath;
    private HoodieStorage storage;
    private AtomicBoolean hasConflict;
    private long maxAllowableHeartbeatIntervalInMs;
    private Set<HoodieInstant> completedCommits;
    private final boolean checkCommitConflict;

    public MarkerBasedEarlyConflictDetectionRunnable(AtomicBoolean atomicBoolean, MarkerHandler markerHandler, String str, String str2, HoodieStorage hoodieStorage, long j, Set<HoodieInstant> set, boolean z) {
        this.markerHandler = markerHandler;
        this.markerDir = str;
        this.basePath = str2;
        this.storage = hoodieStorage;
        this.hasConflict = atomicBoolean;
        this.maxAllowableHeartbeatIntervalInMs = j;
        this.completedCommits = set;
        this.checkCommitConflict = z;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.hasConflict.get()) {
            return;
        }
        try {
            Set<String> pendingMarkersToProcess = this.markerHandler.getPendingMarkersToProcess(this.markerDir);
            if (this.storage.exists(new StoragePath(this.markerDir)) || !pendingMarkersToProcess.isEmpty()) {
                HoodieTimer start = HoodieTimer.start();
                HashSet hashSet = new HashSet();
                hashSet.addAll(this.markerHandler.getAllMarkers(this.markerDir));
                hashSet.addAll(pendingMarkersToProcess);
                List<StoragePath> allMarkerDir = MarkerUtils.getAllMarkerDir(new StoragePath(this.basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), this.storage);
                HoodieActiveTimeline activeTimeline = HoodieTableMetaClient.builder().setConf(this.storage.getConf().newInstance()).setBasePath(this.basePath).setLoadActiveTimelineOnLoad(true).build().getActiveTimeline();
                Set set = (Set) MarkerUtils.getCandidateInstants(activeTimeline, allMarkerDir, MarkerUtils.markerDirToInstantTime(this.markerDir), this.maxAllowableHeartbeatIntervalInMs, this.storage, this.basePath).stream().flatMap(str -> {
                    return MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(str, this.storage, new HoodieLocalEngineContext(this.storage.getConf().newInstance()), 100).values().stream().flatMap((v0) -> {
                        return v0.stream();
                    });
                }).collect(Collectors.toSet());
                Set set2 = (Set) hashSet.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet());
                set2.retainAll((Set) set.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet()));
                if (!set2.isEmpty() || (this.checkCommitConflict && MarkerUtils.hasCommitConflict(activeTimeline, (Set) hashSet.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet()), this.completedCommits))) {
                    LOG.warn("Conflict writing detected based on markers!\nConflict markers: " + hashSet + "\nTable markers: " + set);
                    this.hasConflict.compareAndSet(false, true);
                }
                LOG.info("Finish batching marker-based conflict detection in " + start.endTimer() + " ms");
            }
        } catch (IOException e) {
            throw new HoodieIOException("IOException occurs during checking marker conflict");
        }
    }
}
