/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.timeline.service.handlers.marker;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.timeline.service.handlers.MarkerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarkerBasedEarlyConflictDetectionRunnable
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MarkerBasedEarlyConflictDetectionRunnable.class);
    private final MarkerHandler markerHandler;
    private final String markerDir;
    private final String basePath;
    private final HoodieStorage storage;
    private final AtomicBoolean hasConflict;
    private final long maxAllowableHeartbeatIntervalInMs;
    private final Set<HoodieInstant> completedCommits;
    private final boolean checkCommitConflict;

    public MarkerBasedEarlyConflictDetectionRunnable(AtomicBoolean hasConflict, MarkerHandler markerHandler, String markerDir, String basePath, HoodieStorage storage, long maxAllowableHeartbeatIntervalInMs, Set<HoodieInstant> completedCommits, boolean checkCommitConflict) {
        this.markerHandler = markerHandler;
        this.markerDir = markerDir;
        this.basePath = basePath;
        this.storage = storage;
        this.hasConflict = hasConflict;
        this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
        this.completedCommits = completedCommits;
        this.checkCommitConflict = checkCommitConflict;
    }

    @Override
    public void run() {
        if (this.hasConflict.get()) {
            return;
        }
        try {
            Set<String> pendingMarkers = this.markerHandler.getPendingMarkersToProcess(this.markerDir);
            if (!this.storage.exists(new StoragePath(this.markerDir)) && pendingMarkers.isEmpty()) {
                return;
            }
            HoodieTimer timer = HoodieTimer.start();
            HashSet<String> currentInstantAllMarkers = new HashSet<String>();
            currentInstantAllMarkers.addAll(this.markerHandler.getAllMarkers(this.markerDir));
            currentInstantAllMarkers.addAll(pendingMarkers);
            StoragePath tempPath = new StoragePath(this.basePath, ".hoodie/.temp");
            List instants = MarkerUtils.getAllMarkerDir((StoragePath)tempPath, (HoodieStorage)this.storage);
            HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(this.storage.getConf().newInstance()).setBasePath(this.basePath).setLoadActiveTimelineOnLoad(true).build();
            HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
            List candidate = MarkerUtils.getCandidateInstants((HoodieActiveTimeline)activeTimeline, (List)instants, (String)MarkerUtils.markerDirToInstantTime((String)this.markerDir), (long)this.maxAllowableHeartbeatIntervalInMs, (HoodieStorage)this.storage, (String)this.basePath);
            Set tableMarkers = candidate.stream().flatMap(instant -> MarkerUtils.readTimelineServerBasedMarkersFromFileSystem((String)instant, (HoodieStorage)this.storage, (HoodieEngineContext)new HoodieLocalEngineContext(this.storage.getConf().newInstance()), (int)100).values().stream().flatMap(Collection::stream)).collect(Collectors.toSet());
            Set currentFileIDs = currentInstantAllMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet());
            Set tableFilesIDs = tableMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet());
            currentFileIDs.retainAll(tableFilesIDs);
            if (!currentFileIDs.isEmpty() || this.checkCommitConflict && MarkerUtils.hasCommitConflict((HoodieActiveTimeline)activeTimeline, currentInstantAllMarkers.stream().map(MarkerUtils::makerToPartitionAndFileID).collect(Collectors.toSet()), this.completedCommits)) {
                LOG.error("Conflict writing detected based on markers!\nConflict markers: {}\nTable markers: {}", currentInstantAllMarkers, tableMarkers);
                this.hasConflict.compareAndSet(false, true);
            }
            LOG.info("Finish batching marker-based conflict detection in {} ms", (Object)timer.endTimer());
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException occurs during checking marker conflict");
        }
    }
}

