package kafka.tier.topic;

import io.confluent.rest.TierTopicHeadDataLossDetectionRequest;
import io.confluent.rest.TierTopicHeadDataLossDetectionResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import kafka.log.AbstractLog;
import kafka.server.BrokerReconfigurable;
import kafka.server.KafkaConfig;
import kafka.server.LeaderEndPoint;
import kafka.server.LeaderEndpointSupplier;
import kafka.server.ReplicaManager;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.TierPartitionFence;
import kafka.tier.raft.KRaftSnapshotManager;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.TierPartitionState;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreFunctionUtils;
import kafka.tier.store.objects.ObjectType;
import kafka.tier.store.objects.metadata.TierTopicHeadDataLossReportMetadata;
import kafka.tier.tools.RecoveryUtils;
import kafka.tier.topic.recovery.AffectedTierTopicPartitionInfo;
import kafka.tier.topic.recovery.AffectedUserTopicPartitionInfo;
import kafka.tier.topic.recovery.TierTopicHeadDataLossReport;
import kafka.tier.topic.recovery.ValidationSource;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.PartitionResult;
import org.apache.kafka.clients.admin.ReplicaStatusOptions;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Set;
import scala.jdk.javaapi.CollectionConverters;

/* loaded from: input_file:kafka/tier/topic/TierTopicDataLossValidator.class */
public class TierTopicDataLossValidator implements BrokerReconfigurable {
    private static final int DATA_LOSS_DETECTION_THREAD_POOL_SIZE_MAX = 10;
    private final TierTopicManagerConfig config;
    private final TierTopic tierTopic;
    private final TierObjectStore tierObjectStore;
    private final ReplicaManager replicaManager;
    private final Supplier<ConfluentAdmin> adminClientSupplier;
    private final LeaderEndpointSupplier leaderEndpointSupplier;
    private final Time time;
    public final TierTopicDataLossValidatorMetrics dataLossMetrics;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private volatile long dataLossDetectionMaxTimeoutMs;
    private static final Logger log = LoggerFactory.getLogger(TierTopicDataLossValidator.class);
    private static final long DATA_LOSS_DETECTION_RETRY_SLEEP_TIME_MS = Duration.ofSeconds(15).toMillis();
    public static final Set<String> RECONFIGURABLE_CONFIGS = CollectionConverters.asScala(new HashSet(Collections.singletonList(KafkaConfig.TierTopicDataLossDetectionMaxTimeoutMsProp())));
    private static final Map<TierTopicHeadDataLossReport.CompletionStatus, TierTopicHeadDataLossDetectionResponse.CompletionStatus> COMPLETION_STATUS_CONVERSION_MAP = new HashMap<TierTopicHeadDataLossReport.CompletionStatus, TierTopicHeadDataLossDetectionResponse.CompletionStatus>() { // from class: kafka.tier.topic.TierTopicDataLossValidator.1
        {
            put(TierTopicHeadDataLossReport.CompletionStatus.SUCCESS, TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS);
            put(TierTopicHeadDataLossReport.CompletionStatus.FAILURE, TierTopicHeadDataLossDetectionResponse.CompletionStatus.FAILURE);
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/tier/topic/TierTopicDataLossValidator$FencingError.class */
    public static class FencingError {
        final Exception exception;
        final java.util.Set<TopicIdPartition> failedUserPartitions;

        FencingError(Exception exc, java.util.Set<TopicIdPartition> set) {
            this.exception = exc;
            this.failedUserPartitions = set;
        }

        String errorMessage() {
            Object[] objArr = new Object[2];
            objArr[0] = this.exception == null ? "<none>" : this.exception.getMessage();
            objArr[1] = this.failedUserPartitions;
            return String.format("Tier topic data loss fencing failed. Sample error: %s. Partitions not fenced: %s", objArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/tier/topic/TierTopicDataLossValidator$OffsetForLeaderEpochTask.class */
    public class OffsetForLeaderEpochTask implements Callable<Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset>> {
        final int leaderId;
        final Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> request;
        final Map<Integer, LeaderEndPoint> brokerToLeaderEndPointCache;

        OffsetForLeaderEpochTask(int i, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> map, Map<Integer, LeaderEndPoint> map2) {
            this.leaderId = i;
            this.request = map;
            this.brokerToLeaderEndPointCache = map2;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> call() throws Exception {
            LeaderEndPoint leaderEndPoint = TierTopicDataLossValidator.this.getLeaderEndPoint(TierTopicDataLossValidator.this.config.brokerId, this.leaderId, this.brokerToLeaderEndPointCache);
            TierTopicDataLossValidator.log.debug("Sending OffsetForLeaderEpoch request to node {} for {} partitions: {}", new Object[]{Integer.valueOf(this.leaderId), Integer.valueOf(this.request.size()), this.request});
            Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> asJava = CollectionConverters.asJava(leaderEndPoint.fetchEpochEndOffsets(CollectionConverters.asScala(this.request)));
            TierTopicDataLossValidator.log.debug("Completed OffsetForLeaderEpoch request to node {} for {} partitions: {}", new Object[]{Integer.valueOf(this.leaderId), Integer.valueOf(asJava.size()), asJava});
            return asJava;
        }
    }

    public TierTopicDataLossValidator(TierTopicManagerConfig tierTopicManagerConfig, TierTopic tierTopic, TierObjectStore tierObjectStore, ReplicaManager replicaManager, Supplier<ConfluentAdmin> supplier, LeaderEndpointSupplier leaderEndpointSupplier, Time time, Metrics metrics) {
        this.config = tierTopicManagerConfig;
        this.tierTopic = tierTopic;
        this.tierObjectStore = tierObjectStore;
        this.replicaManager = replicaManager;
        this.adminClientSupplier = supplier;
        this.leaderEndpointSupplier = leaderEndpointSupplier;
        this.time = time;
        this.dataLossMetrics = new TierTopicDataLossValidatorMetrics(metrics);
        this.dataLossDetectionMaxTimeoutMs = tierTopicManagerConfig.tierTopicDataLossDetectionMaxTimeoutMs();
    }

    private void checkDataLossValidationEnabled() {
        if (!this.config.enableTierTopicDataLossDetection().booleanValue()) {
            throw new UnsupportedOperationException(String.format("Can't run data loss detection in tier topic as %s is disabled.", KafkaConfig.TierTopicDataLossDetectionEnableProp()));
        }
    }

    private void checkValidationSource(ValidationSource validationSource) {
        if (validationSource == ValidationSource.UNCLEAN_RESTART_VALIDATION) {
            if (this.replicaManager.logManager().hadCleanShutdown()) {
                throw new UnsupportedOperationException(String.format("Can't run data loss detection in tier topic with validation source: %s as the broker had clean shutdown.", ValidationSource.UNCLEAN_RESTART_VALIDATION));
            }
        } else if (validationSource != ValidationSource.ON_DEMAND_VALIDATION) {
            throw new UnsupportedOperationException("Unsupported validation source: " + validationSource);
        }
    }

    private boolean isUserPartitionFencingNeeded(ValidationSource validationSource) {
        if (validationSource == ValidationSource.UNCLEAN_RESTART_VALIDATION) {
            return this.config.enableTierTopicFencingDuringDataLoss.booleanValue();
        }
        return false;
    }

    @Override // kafka.server.BrokerReconfigurable
    /* renamed from: reconfigurableConfigs */
    public Set<String> mo1257reconfigurableConfigs() {
        return RECONFIGURABLE_CONFIGS;
    }

    @Override // kafka.server.BrokerReconfigurable
    public void validateReconfiguration(KafkaConfig kafkaConfig) {
        if (kafkaConfig.confluentConfig().tierTopicDataLossDetectionMaxTimeoutMs().longValue() < 0) {
            throw new ConfigException(String.format("%s must have a value at least >= 0.", KafkaConfig.TierTopicDataLossDetectionMaxTimeoutMsProp()));
        }
    }

    @Override // kafka.server.BrokerReconfigurable
    public void reconfigure(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        long longValue = kafkaConfig.confluentConfig().tierTopicDataLossDetectionMaxTimeoutMs().longValue();
        long longValue2 = kafkaConfig2.confluentConfig().tierTopicDataLossDetectionMaxTimeoutMs().longValue();
        if (longValue != longValue2) {
            log.info("Reconfigure {} from {} to {}", new Object[]{KafkaConfig.TierTopicDataLossDetectionMaxTimeoutMsProp(), Long.valueOf(longValue), Long.valueOf(longValue2)});
            this.dataLossDetectionMaxTimeoutMs = longValue2;
        }
    }

    public long dataLossDetectionMaxTimeoutMs() {
        return this.dataLossDetectionMaxTimeoutMs;
    }

    public TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHead(TierTopicHeadDataLossDetectionRequest tierTopicHeadDataLossDetectionRequest, ValidationSource validationSource, Producer<byte[], byte[]> producer, long j) throws InterruptedException, ExecutionException {
        return detectDataLossInTierTopicHead(tierTopicHeadDataLossDetectionRequest, validationSource, producer, j, DATA_LOSS_DETECTION_RETRY_SLEEP_TIME_MS);
    }

    public void shutdown() {
        this.shutdown.compareAndSet(false, true);
    }

    public TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHead(TierTopicHeadDataLossDetectionRequest tierTopicHeadDataLossDetectionRequest, ValidationSource validationSource, Producer<byte[], byte[]> producer, long j, long j2) throws InterruptedException {
        if (this.shutdown.get()) {
            throw new UnsupportedOperationException("Can't detect data loss in tier topic head because the broker is already shutting down.");
        }
        long milliseconds = this.time.milliseconds();
        checkDataLossValidationEnabled();
        checkValidationSource(validationSource);
        boolean isUserPartitionFencingNeeded = isUserPartitionFencingNeeded(validationSource);
        log.info("Detecting data loss in tier topic head with fencing {} (ValidationSource={}).", isUserPartitionFencingNeeded ? "enabled" : "disabled", validationSource);
        try {
            TierTopicHeadDataLossReport detectDataLossInTierTopicHead = detectDataLossInTierTopicHead(milliseconds, tierTopicHeadDataLossDetectionRequest, validationSource, isUserPartitionFencingNeeded ? producer : null, j, j2);
            if (detectDataLossInTierTopicHead == null) {
                return new TierTopicHeadDataLossDetectionResponse(KRaftSnapshotManager.KEY_PREFIX, TierTopicHeadDataLossDetectionResponse.CompletionStatus.SUCCESS, Collections.emptyList());
            }
            String str = KRaftSnapshotManager.KEY_PREFIX;
            if (detectDataLossInTierTopicHead.hasDataLoss() || detectDataLossInTierTopicHead.hasFailures()) {
                str = uploadReport(tierTopicHeadDataLossDetectionRequest.identifier(), detectDataLossInTierTopicHead);
            }
            return new TierTopicHeadDataLossDetectionResponse(str, COMPLETION_STATUS_CONVERSION_MAP.get(detectDataLossInTierTopicHead.completionStatus()), detectDataLossInTierTopicHead.errorMessages());
        } catch (Exception e) {
            this.dataLossMetrics.recordDataLossDetectionFailure(validationSource, true);
            throw e;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public TierTopicHeadDataLossReport detectDataLossInTierTopicHead(long j, TierTopicHeadDataLossDetectionRequest tierTopicHeadDataLossDetectionRequest, ValidationSource validationSource, Producer<byte[], byte[]> producer, long j2, long j3) throws InterruptedException {
        HashMap hashMap = new HashMap();
        this.replicaManager.leaderPartitionsIterator().foreach(partition -> {
            hashMap.put(partition.topicPartition(), true);
            return null;
        });
        Map<TopicPartition, AffectedTierTopicPartitionInfo> tierTopicMaxMaterializedInfoByUserPartition = getTierTopicMaxMaterializedInfoByUserPartition(tierTopicHeadDataLossDetectionRequest.tierTopicPartitionsAllowList(), hashMap);
        if (tierTopicMaxMaterializedInfoByUserPartition.isEmpty()) {
            log.info("Skip data loss detection in tier topic as there are no tier topic partitions to validate.");
            this.dataLossMetrics.clearAll();
            return null;
        }
        ConfluentAdmin confluentAdmin = this.adminClientSupplier.get();
        ExecutorService executorService = null;
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        try {
            HashMap hashMap2 = new HashMap();
            Map<TopicPartition, PartitionResult> tierTopicPartitionLeaderInfo = getTierTopicPartitionLeaderInfo(confluentAdmin, tierTopicMaxMaterializedInfoByUserPartition.keySet());
            ArrayList arrayList = new ArrayList();
            int i = 0;
            while (!tierTopicMaxMaterializedInfoByUserPartition.isEmpty() && !hasTimedOut(j, j2, tierTopicMaxMaterializedInfoByUserPartition, arrayList) && !hasBeenShutdown(tierTopicMaxMaterializedInfoByUserPartition, arrayList)) {
                i++;
                if (i != 1) {
                    log.info("Sleeping for {}ms before retrying data loss detection", Long.valueOf(j3));
                    Thread.sleep(j3);
                }
                log.info("[Trial={}] Trying to detect data loss on {} tier topic partitions (ValidationSource={})", new Object[]{Integer.valueOf(i), Integer.valueOf(tierTopicMaxMaterializedInfoByUserPartition.size()), validationSource});
                Map<Integer, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>> buildOffsetForLeaderEpochRequests = buildOffsetForLeaderEpochRequests(tierTopicMaxMaterializedInfoByUserPartition, tierTopicPartitionLeaderInfo);
                if (executorService == null && !buildOffsetForLeaderEpochRequests.isEmpty()) {
                    executorService = Executors.newFixedThreadPool(Math.min(buildOffsetForLeaderEpochRequests.size(), 10));
                }
                Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> sendOffsetForLeaderEpochRequests = sendOffsetForLeaderEpochRequests(buildOffsetForLeaderEpochRequests, concurrentHashMap, executorService);
                Map<TopicPartition, PartitionResult> tierTopicPartitionLeaderInfo2 = getTierTopicPartitionLeaderInfo(confluentAdmin, tierTopicMaxMaterializedInfoByUserPartition.keySet());
                hashMap2.putAll(findDataLoss(tierTopicMaxMaterializedInfoByUserPartition, sendOffsetForLeaderEpochRequests, tierTopicPartitionLeaderInfo, tierTopicPartitionLeaderInfo2));
                tierTopicMaxMaterializedInfoByUserPartition.keySet().removeAll(sendOffsetForLeaderEpochRequests.keySet());
                tierTopicPartitionLeaderInfo = tierTopicPartitionLeaderInfo2;
            }
            Map<TopicIdPartition, AffectedUserTopicPartitionInfo> affectedUserPartitions = getAffectedUserPartitions(hashMap2, hashMap);
            long milliseconds = this.time.milliseconds();
            TierTopicHeadDataLossReport.CompletionStatus completionStatus = TierTopicHeadDataLossReport.CompletionStatus.FAILURE;
            if (tierTopicMaxMaterializedInfoByUserPartition.isEmpty() && arrayList.isEmpty()) {
                completionStatus = TierTopicHeadDataLossReport.CompletionStatus.SUCCESS;
            }
            java.util.Set hashSet = new HashSet();
            if (producer == null) {
                log.warn("Skipping fencing of {} user topic partitions that were found to have data loss in tier topic", Integer.valueOf(affectedUserPartitions.size()));
            } else {
                Optional<FencingError> fenceUserTopicPartitions = fenceUserTopicPartitions(affectedUserPartitions.keySet(), producer);
                if (fenceUserTopicPartitions.isPresent()) {
                    completionStatus = TierTopicHeadDataLossReport.CompletionStatus.FAILURE;
                    arrayList.add(fenceUserTopicPartitions.get().errorMessage());
                    hashSet = fenceUserTopicPartitions.get().failedUserPartitions;
                }
            }
            TierTopicHeadDataLossReport createReport = TierTopicHeadDataLossReport.createReport(completionStatus, this.config.brokerId, validationSource, j, milliseconds, affectedUserPartitions, hashMap2, tierTopicMaxMaterializedInfoByUserPartition.keySet(), hashSet, arrayList);
            createReport.log();
            this.dataLossMetrics.recordDataLossReport(createReport);
            for (LeaderEndPoint leaderEndPoint : concurrentHashMap.values()) {
                leaderEndPoint.initiateClose();
                leaderEndPoint.close();
            }
            if (executorService != null) {
                executorService.shutdown();
            }
            confluentAdmin.close();
            return createReport;
        } catch (Throwable th) {
            for (LeaderEndPoint leaderEndPoint2 : concurrentHashMap.values()) {
                leaderEndPoint2.initiateClose();
                leaderEndPoint2.close();
            }
            if (executorService != null) {
                executorService.shutdown();
            }
            confluentAdmin.close();
            throw th;
        }
    }

    private boolean hasTimedOut(long j, long j2, Map<TopicPartition, AffectedTierTopicPartitionInfo> map, List<String> list) {
        long j3 = this.dataLossDetectionMaxTimeoutMs;
        if (j2 >= 0) {
            j3 = Math.min(j2, this.dataLossDetectionMaxTimeoutMs);
        }
        if (this.time.milliseconds() - j <= j3) {
            return false;
        }
        String format = String.format("Tier topic data loss detection timed out after %d ms. Validation could not be completed for %d tier topic partitions: %s.", Long.valueOf(j3), Integer.valueOf(map.size()), map.keySet());
        if (map.isEmpty()) {
            log.info(format);
            return true;
        }
        log.error(format);
        list.add(format);
        return true;
    }

    private boolean hasBeenShutdown(Map<TopicPartition, AffectedTierTopicPartitionInfo> map, List<String> list) {
        if (!this.shutdown.get()) {
            return false;
        }
        String format = String.format("Tier topic data loss detection was interrupted due to broker shutdown. Validation could not be completed for %d tier topic partitions: %s.", Integer.valueOf(map.size()), map.keySet());
        if (map.isEmpty()) {
            log.info(format);
            return true;
        }
        log.error(format);
        list.add(format);
        return true;
    }

    private Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> sendOffsetForLeaderEpochRequests(Map<Integer, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>> map, Map<Integer, LeaderEndPoint> map2, ExecutorService executorService) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<Integer, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>> entry : map.entrySet()) {
            OffsetForLeaderEpochTask offsetForLeaderEpochTask = new OffsetForLeaderEpochTask(entry.getKey().intValue(), entry.getValue(), map2);
            arrayList2.add(executorService.submit(offsetForLeaderEpochTask));
            arrayList.add(offsetForLeaderEpochTask);
        }
        HashMap hashMap = new HashMap();
        for (int i = 0; i < arrayList2.size(); i++) {
            Future future = (Future) arrayList2.get(i);
            OffsetForLeaderEpochTask offsetForLeaderEpochTask2 = (OffsetForLeaderEpochTask) arrayList.get(i);
            try {
                ((Map) future.get()).forEach((topicPartition, epochEndOffset) -> {
                    if (epochEndOffset.errorCode() == Errors.NONE.code()) {
                        hashMap.put(topicPartition, epochEndOffset);
                    } else {
                        log.error("Can't validate data loss for tier topic partition {} because OffsetForLeaderEpoch request to leader {} returned error code {} in the response {}", new Object[]{topicPartition, Integer.valueOf(offsetForLeaderEpochTask2.leaderId), Errors.forCode(epochEndOffset.errorCode()), epochEndOffset});
                    }
                });
            } catch (Exception e) {
                log.error("Failed to successfully execute the OffsetForLeaderEpoch task for leader: {}", Integer.valueOf(offsetForLeaderEpochTask2.leaderId), e);
            }
        }
        return hashMap;
    }

    private String uploadReport(String str, TierTopicHeadDataLossReport tierTopicHeadDataLossReport) {
        try {
            String putBuffer = TierObjectStoreFunctionUtils.putBuffer(() -> {
                return false;
            }, this.tierObjectStore, new TierTopicHeadDataLossReportMetadata(str, tierTopicHeadDataLossReport.brokerId(), tierTopicHeadDataLossReport.creationTimestamp()), ByteBuffer.wrap(TierTopicHeadDataLossReport.getJsonString(tierTopicHeadDataLossReport).getBytes()), ObjectType.TIER_TOPIC_HEAD_DATA_LOSS_REPORT);
            this.dataLossMetrics.recordDataLossReportUploadStatus(tierTopicHeadDataLossReport.source(), true);
            log.info("Successfully uploaded tier topic head data loss report to the object store at the path {}. Identifier: {}.", putBuffer, str);
            return putBuffer;
        } catch (Exception e) {
            this.dataLossMetrics.recordDataLossReportUploadStatus(tierTopicHeadDataLossReport.source(), false);
            throw new RuntimeException(String.format("Could not upload data loss report to object store. Identifier: %s.", str), e);
        }
    }

    private static String leaderAsString(PartitionResult partitionResult) {
        return partitionResult == null ? "null" : String.format("(leaderId:%d, leaderEpoch:%s)", Integer.valueOf(partitionResult.leaderId()), partitionResult.leaderEpoch());
    }

    private static boolean hasSameLeader(TopicPartition topicPartition, PartitionResult partitionResult, PartitionResult partitionResult2) {
        if (partitionResult == null) {
            throw new IllegalArgumentException("leaderBefore can't be null for tier topic partition: " + topicPartition);
        }
        boolean z = partitionResult2 != null && partitionResult.leaderId() == partitionResult2.leaderId() && partitionResult.leaderEpoch().getAsInt() == partitionResult2.leaderEpoch().getAsInt();
        if (!z) {
            log.info("For tier topic partition: {} previous leader: {} is different from current leader: {}", new Object[]{topicPartition, leaderAsString(partitionResult), leaderAsString(partitionResult2)});
        }
        return z;
    }

    private static boolean isValidPartitionResult(PartitionResult partitionResult) {
        return partitionResult != null && partitionResult.leaderId() >= 0 && partitionResult.leaderEpoch().isPresent() && partitionResult.leaderEpoch().getAsInt() >= 0;
    }

    private Map<TopicPartition, AffectedTierTopicPartitionInfo> findDataLoss(Map<TopicPartition, AffectedTierTopicPartitionInfo> map, Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> map2, Map<TopicPartition, PartitionResult> map3, Map<TopicPartition, PartitionResult> map4) {
        HashMap hashMap = new HashMap();
        map2.forEach((topicPartition, epochEndOffset) -> {
            PartitionResult partitionResult = (PartitionResult) map3.get(topicPartition);
            PartitionResult partitionResult2 = (PartitionResult) map4.get(topicPartition);
            AffectedTierTopicPartitionInfo affectedTierTopicPartitionInfo = (AffectedTierTopicPartitionInfo) map.get(topicPartition);
            OffsetAndEpoch maxLastMaterializedOffsetAndEpoch = affectedTierTopicPartitionInfo.maxLastMaterializedOffsetAndEpoch();
            OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(epochEndOffset.endOffset(), Optional.of(Integer.valueOf(epochEndOffset.leaderEpoch())));
            if (!hasSameLeader(topicPartition, partitionResult, partitionResult2)) {
                log.warn("Can't validata data loss on tier topic partition: {} because leader has changed.", topicPartition);
                return;
            }
            AffectedTierTopicPartitionInfo affectedTierTopicPartitionInfo2 = (AffectedTierTopicPartitionInfo) map.get(topicPartition);
            if (!(offsetAndEpoch.epoch().orElse(-1).intValue() != maxLastMaterializedOffsetAndEpoch.epoch().orElse(-1).intValue() || epochEndOffset.endOffset() <= maxLastMaterializedOffsetAndEpoch.offset())) {
                log.info("No data loss detected in tier topic partition: {}", topicPartition);
            } else {
                hashMap.put(topicPartition, new AffectedTierTopicPartitionInfo(affectedTierTopicPartitionInfo2.maxLastMaterializedOffsetAndEpoch(), affectedTierTopicPartitionInfo2.maxLastMaterializedPartition(), affectedTierTopicPartitionInfo2.ftpsStatus(), affectedTierTopicPartitionInfo2.isLeader(), this.config.brokerId, new OffsetAndEpoch(epochEndOffset.endOffset(), Optional.of(Integer.valueOf(epochEndOffset.leaderEpoch())))));
                log.error("Data loss detected in tier topic partition: {}. User partition where data loss was detected: {} with FTPS last materialized OffsetAndEpoch: {}, leader's response: {}", new Object[]{topicPartition, affectedTierTopicPartitionInfo.maxLastMaterializedPartition(), maxLastMaterializedOffsetAndEpoch, offsetAndEpoch});
            }
        });
        return hashMap;
    }

    private boolean shouldProcessLog(AbstractLog abstractLog, java.util.Set<TopicPartition> set) {
        if (abstractLog.isStray() || abstractLog.isDeleted() || !abstractLog.topicIdPartition().isDefined()) {
            return false;
        }
        TierPartitionState tierPartitionState = abstractLog.tierPartitionState();
        Optional<TopicIdPartition> optional = tierPartitionState.topicIdPartition();
        OffsetAndEpoch lastLocalMaterializedSrcOffsetAndEpoch = tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch();
        if (optional.isPresent() && lastLocalMaterializedSrcOffsetAndEpoch.epoch().isPresent() && lastLocalMaterializedSrcOffsetAndEpoch.offset() >= 0) {
            return set.isEmpty() || set.contains(this.tierTopic.toTierTopicPartition(optional.get()));
        }
        return false;
    }

    private Map<TopicPartition, AffectedTierTopicPartitionInfo> getTierTopicMaxMaterializedInfoByUserPartition(java.util.Set<TopicPartition> set, Map<TopicPartition, Boolean> map) {
        HashMap hashMap = new HashMap();
        this.replicaManager.logManager().allLogs().foreach(abstractLog -> {
            if (!shouldProcessLog(abstractLog, set)) {
                return null;
            }
            TierPartitionState tierPartitionState = abstractLog.tierPartitionState();
            Optional<TopicIdPartition> optional = tierPartitionState.topicIdPartition();
            TopicPartition tierTopicPartition = this.tierTopic.toTierTopicPartition(optional.get());
            AffectedTierTopicPartitionInfo affectedTierTopicPartitionInfo = (AffectedTierTopicPartitionInfo) hashMap.get(tierTopicPartition);
            if (affectedTierTopicPartitionInfo != null && tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch().offset() <= affectedTierTopicPartitionInfo.maxLastMaterializedOffsetAndEpoch().offset()) {
                return null;
            }
            hashMap.put(tierTopicPartition, new AffectedTierTopicPartitionInfo(tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch(), optional.get(), tierPartitionState.status(), map.containsKey(optional.get().topicPartition()), this.config.brokerId, OffsetAndEpoch.EMPTY));
            return null;
        });
        return hashMap;
    }

    private Map<TopicPartition, PartitionResult> getTierTopicPartitionLeaderInfo(ConfluentAdmin confluentAdmin, java.util.Set<TopicPartition> set) {
        HashMap hashMap = new HashMap();
        confluentAdmin.replicaStatus(set, new ReplicaStatusOptions()).partitionResults().forEach((topicPartition, kafkaFuture) -> {
            PartitionResult partitionResult;
            try {
                partitionResult = (PartitionResult) kafkaFuture.get();
                int leaderId = partitionResult.leaderId();
                OptionalInt leaderEpoch = partitionResult.leaderEpoch();
                if (!isValidPartitionResult(partitionResult)) {
                    log.warn("Can't get leader info for tier topic partition {} due to unexpected PartitionResult with leaderId: {} and leaderEpoch: {}", new Object[]{topicPartition, Integer.valueOf(leaderId), Integer.valueOf(leaderEpoch.orElse(-1))});
                    partitionResult = null;
                }
            } catch (Exception e) {
                log.warn("Can't get leader info for tier topic partition {}", topicPartition, e);
                partitionResult = null;
            }
            if (partitionResult != null) {
                hashMap.put(topicPartition, partitionResult);
            }
        });
        return hashMap;
    }

    private Map<Integer, Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>> buildOffsetForLeaderEpochRequests(Map<TopicPartition, AffectedTierTopicPartitionInfo> map, Map<TopicPartition, PartitionResult> map2) {
        HashMap hashMap = new HashMap();
        map2.forEach((topicPartition, partitionResult) -> {
            AffectedTierTopicPartitionInfo affectedTierTopicPartitionInfo = (AffectedTierTopicPartitionInfo) map.get(topicPartition);
            if (affectedTierTopicPartitionInfo != null) {
                ((Map) hashMap.computeIfAbsent(Integer.valueOf(partitionResult.leaderId()), num -> {
                    return new HashMap();
                })).put(topicPartition, new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(topicPartition.partition()).setLeaderEpoch(affectedTierTopicPartitionInfo.maxLastMaterializedOffsetAndEpoch().epoch().get().intValue()));
            }
        });
        return hashMap;
    }

    private Map<TopicIdPartition, AffectedUserTopicPartitionInfo> getAffectedUserPartitions(Map<TopicPartition, AffectedTierTopicPartitionInfo> map, Map<TopicPartition, Boolean> map2) {
        if (map.isEmpty()) {
            log.debug("No tier topic partition has data loss, so no user partitions got affected.");
            return new HashMap();
        }
        HashMap hashMap = new HashMap();
        this.replicaManager.logManager().allLogs().foreach(abstractLog -> {
            if (!shouldProcessLog(abstractLog, new HashSet())) {
                return null;
            }
            TierPartitionState tierPartitionState = abstractLog.tierPartitionState();
            OffsetAndEpoch lastLocalMaterializedSrcOffsetAndEpoch = tierPartitionState.lastLocalMaterializedSrcOffsetAndEpoch();
            Optional<TopicIdPartition> optional = tierPartitionState.topicIdPartition();
            TopicPartition tierTopicPartition = this.tierTopic.toTierTopicPartition(optional.get());
            AffectedTierTopicPartitionInfo affectedTierTopicPartitionInfo = (AffectedTierTopicPartitionInfo) map.get(tierTopicPartition);
            boolean containsKey = map2.containsKey(optional.get().topicPartition());
            if (affectedTierTopicPartitionInfo == null || lastLocalMaterializedSrcOffsetAndEpoch.offset() < affectedTierTopicPartitionInfo.tierTopicEndOffsetAndEpoch().offset()) {
                return null;
            }
            hashMap.put(optional.get(), new AffectedUserTopicPartitionInfo(tierTopicPartition.partition(), lastLocalMaterializedSrcOffsetAndEpoch, tierPartitionState.status(), containsKey));
            return null;
        });
        return hashMap;
    }

    private Optional<FencingError> fenceUserTopicPartitions(java.util.Set<TopicIdPartition> set, Producer<byte[], byte[]> producer) {
        if (set.isEmpty()) {
            log.info("No user topic partitions to be fenced");
            return Optional.empty();
        }
        log.info("Fencing {} user topic partitions", Integer.valueOf(set.size()));
        HashSet hashSet = new HashSet(set);
        Exception exc = null;
        Iterator<TopicIdPartition> it = set.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TopicIdPartition next = it.next();
            if (this.shutdown.get()) {
                log.error("Can't fence user partition: {} because broker is shutting down.", next);
                break;
            }
            try {
                RecoveryUtils.injectTierTopicEvent(producer, new TierPartitionFence(next, UUID.randomUUID(), false), this.tierTopic.topicName(), this.tierTopic.numPartitions().getAsInt());
                hashSet.remove(next);
            } catch (Exception e) {
                if (exc == null) {
                    exc = e;
                }
            }
        }
        return (exc == null && hashSet.isEmpty()) ? Optional.empty() : Optional.of(new FencingError(exc, hashSet));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LeaderEndPoint getLeaderEndPoint(int i, int i2, Map<Integer, LeaderEndPoint> map) {
        LeaderEndPoint leaderEndPoint = map.get(Integer.valueOf(i2));
        if (leaderEndPoint != null) {
            return leaderEndPoint;
        }
        LeaderEndPoint leaderEndPoint2 = this.leaderEndpointSupplier.get(i, i2, getClass().getSimpleName());
        map.put(Integer.valueOf(i2), leaderEndPoint2);
        return leaderEndPoint2;
    }
}
