package kafka.log.remote;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Timer;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.MergedLog;
import kafka.log.remote.quota.RLMQuotaManager;
import kafka.log.remote.quota.RLMQuotaManagerConfig;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.QuotaType$RLMCopy$;
import kafka.server.QuotaType$RLMFetch$;
import kafka.server.StopPartition;
import kafka.tier.domain.TierObjectMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.RemoteLogInputStream;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.ChildFirstClassLoader;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.CheckpointFile;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogSegment;
import org.apache.kafka.storage.internals.log.OffsetPosition;
import org.apache.kafka.storage.internals.log.RemoteIndexCache;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/log/remote/RemoteLogManager.class */
public class RemoteLogManager implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
    private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
    private final KafkaConfig config;
    private final int brokerId;
    private final String logDir;
    private final Time time;
    private final Function<TopicPartition, Optional<MergedLog>> fetchLog;
    private final BiConsumer<TopicPartition, Long> updateRemoteLogStartOffset;
    private final BrokerTopicStats brokerTopicStats;
    private final Metrics metrics;
    private final RemoteIndexCache indexCache;
    private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
    private final RLMScheduledThreadPool rlmScheduledThreadPool;
    private final long delayInMs;
    private final String clusterId;
    private final Timer remoteReadTimer;
    private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);
    private final Condition copyQuotaManagerLockCondition = this.copyQuotaManagerLock.newCondition();
    private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture> leaderOrFollowerTasks = new ConcurrentHashMap<>();
    private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap = new ConcurrentHashMap();
    private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(getClass());
    private Optional<EndPoint> endpoint = Optional.empty();
    private boolean closed = false;
    private volatile boolean remoteLogManagerConfigured = false;
    private final RemoteStorageManager remoteLogStorageManager = createRemoteStorageManager();
    private final RemoteLogMetadataManager remoteLogMetadataManager = createRemoteLogMetadataManager();
    private final RLMQuotaManager rlmCopyQuotaManager = createRLMCopyQuotaManager();
    private final RLMQuotaManager rlmFetchQuotaManager = createRLMFetchQuotaManager();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kafka/log/remote/RemoteLogManager$CancellableRunnable.class */
    public static abstract class CancellableRunnable implements Runnable {
        private volatile boolean cancelled;

        private CancellableRunnable() {
            this.cancelled = false;
        }

        public void cancel() {
            this.cancelled = true;
        }

        public boolean isCancelled() {
            return this.cancelled;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/log/remote/RemoteLogManager$EnrichedLogSegment.class */
    public static class EnrichedLogSegment {
        private final LogSegment logSegment;
        private final long nextSegmentOffset;

        public EnrichedLogSegment(LogSegment logSegment, long j) {
            this.logSegment = logSegment;
            this.nextSegmentOffset = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            EnrichedLogSegment enrichedLogSegment = (EnrichedLogSegment) obj;
            return this.nextSegmentOffset == enrichedLogSegment.nextSegmentOffset && Objects.equals(this.logSegment, enrichedLogSegment.logSegment);
        }

        public int hashCode() {
            return Objects.hash(this.logSegment, Long.valueOf(this.nextSegmentOffset));
        }

        public String toString() {
            return "EnrichedLogSegment{logSegment=" + this.logSegment + ", nextSegmentOffset=" + this.nextSegmentOffset + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/log/remote/RemoteLogManager$RLMScheduledThreadPool.class */
    public static class RLMScheduledThreadPool {
        private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
        private final int poolSize;
        private final ScheduledThreadPoolExecutor scheduledThreadPool = createPool();

        public RLMScheduledThreadPool(int i) {
            this.poolSize = i;
        }

        private ScheduledThreadPoolExecutor createPool() {
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(this.poolSize);
            scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
            scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
            scheduledThreadPoolExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
            scheduledThreadPoolExecutor.setThreadFactory(new ThreadFactory() { // from class: kafka.log.remote.RemoteLogManager.RLMScheduledThreadPool.1
                private final AtomicInteger sequence = new AtomicInteger();

                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    return KafkaThread.daemon("kafka-rlm-thread-pool-" + this.sequence.incrementAndGet(), runnable);
                }
            });
            return scheduledThreadPoolExecutor;
        }

        public Double getIdlePercent() {
            return Double.valueOf(1.0d - (this.scheduledThreadPool.getActiveCount() / this.scheduledThreadPool.getCorePoolSize()));
        }

        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            LOGGER.info("Scheduling runnable {} with initial delay: {}, fixed delay: {}", new Object[]{runnable, Long.valueOf(j), Long.valueOf(j2)});
            return this.scheduledThreadPool.scheduleWithFixedDelay(runnable, j, j2, timeUnit);
        }

        public void close() {
            RemoteLogManager.shutdownAndAwaitTermination(this.scheduledThreadPool, "RLMScheduledThreadPool", 10L, TimeUnit.SECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/log/remote/RemoteLogManager$RLMTask.class */
    public class RLMTask extends CancellableRunnable {
        private final TopicIdPartition topicIdPartition;
        private final int customMetadataSizeLimit;
        private final Logger logger;
        private volatile int leaderEpoch;
        private volatile Optional<OffsetAndEpoch> copiedOffsetOption;
        private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader;
        private volatile Optional<String> logDirectory;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:kafka/log/remote/RemoteLogManager$RLMTask$RemoteLogRetentionHandler.class */
        public class RemoteLogRetentionHandler {
            private final Optional<RetentionSizeData> retentionSizeData;
            private final Optional<RetentionTimeData> retentionTimeData;
            private long remainingBreachedSize;
            private OptionalLong logStartOffset = OptionalLong.empty();

            public RemoteLogRetentionHandler(Optional<RetentionSizeData> optional, Optional<RetentionTimeData> optional2) {
                this.retentionSizeData = optional;
                this.retentionTimeData = optional2;
                this.remainingBreachedSize = ((Long) optional.map(retentionSizeData -> {
                    return Long.valueOf(retentionSizeData.remainingBreachedSize);
                }).orElse(0L)).longValue();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
                boolean z = false;
                if (!this.retentionSizeData.isPresent()) {
                    return false;
                }
                if (this.remainingBreachedSize > 0) {
                    long segmentSizeInBytes = this.remainingBreachedSize - remoteLogSegmentMetadata.segmentSizeInBytes();
                    if (segmentSizeInBytes >= 0) {
                        this.remainingBreachedSize = segmentSizeInBytes;
                        z = true;
                    }
                }
                if (z) {
                    if (!this.logStartOffset.isPresent() || this.logStartOffset.getAsLong() < remoteLogSegmentMetadata.endOffset() + 1) {
                        this.logStartOffset = OptionalLong.of(remoteLogSegmentMetadata.endOffset() + 1);
                    }
                    RLMTask.this.logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), Long.valueOf(this.retentionSizeData.get().retentionSize), Long.valueOf(this.remainingBreachedSize + this.retentionSizeData.get().retentionSize)});
                }
                return z;
            }

            public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
                if (!this.retentionTimeData.isPresent()) {
                    return false;
                }
                boolean z = remoteLogSegmentMetadata.maxTimestampMs() <= this.retentionTimeData.get().cleanupUntilMs;
                if (z) {
                    this.remainingBreachedSize = Math.max(0L, this.remainingBreachedSize - remoteLogSegmentMetadata.segmentSizeInBytes());
                    if (!this.logStartOffset.isPresent() || this.logStartOffset.getAsLong() < remoteLogSegmentMetadata.endOffset() + 1) {
                        this.logStartOffset = OptionalLong.of(remoteLogSegmentMetadata.endOffset() + 1);
                    }
                    RLMTask.this.logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment", remoteLogSegmentMetadata.remoteLogSegmentId(), Long.valueOf(this.retentionTimeData.get().retentionMs));
                }
                return z;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean isSegmentBreachByLogStartOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long j, NavigableMap<Integer, Long> navigableMap) {
                boolean z = false;
                if (!navigableMap.isEmpty()) {
                    Integer firstKey = navigableMap.firstKey();
                    z = remoteLogSegmentMetadata.segmentLeaderEpochs().keySet().stream().allMatch(num -> {
                        return num.intValue() <= firstKey.intValue();
                    }) && remoteLogSegmentMetadata.endOffset() < j;
                }
                if (z) {
                    RLMTask.this.logger.info("About to delete remote log segment {} due to log-start-offset {} breach. Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), Long.valueOf(j), navigableMap.firstEntry(), Long.valueOf(remoteLogSegmentMetadata.endOffset()), remoteLogSegmentMetadata.segmentLeaderEpochs()});
                }
                return z;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry epochEntry, RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException, ExecutionException, InterruptedException {
                boolean deleteRemoteLogSegment = deleteRemoteLogSegment(remoteLogSegmentMetadata, remoteLogSegmentMetadata2 -> {
                    return remoteLogSegmentMetadata.segmentLeaderEpochs().keySet().stream().allMatch(num -> {
                        return num.intValue() < epochEntry.epoch;
                    });
                });
                if (deleteRemoteLogSegment) {
                    RLMTask.this.logger.info("Deleted remote log segment {} due to leader-epoch-cache truncation. Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), epochEntry, Long.valueOf(remoteLogSegmentMetadata.endOffset()), remoteLogSegmentMetadata.segmentLeaderEpochs().keySet()});
                }
                return deleteRemoteLogSegment;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate) throws RemoteStorageException, ExecutionException, InterruptedException {
                if (!predicate.test(remoteLogSegmentMetadata)) {
                    return false;
                }
                RLMTask.this.logger.debug("Deleting remote log segment {}", remoteLogSegmentMetadata.remoteLogSegmentId());
                String str = remoteLogSegmentMetadata.topicIdPartition().topic();
                RemoteLogManager.this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(remoteLogSegmentMetadata.remoteLogSegmentId(), RemoteLogManager.this.time.milliseconds(), remoteLogSegmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, RemoteLogManager.this.brokerId)).get();
                RemoteLogManager.this.brokerTopicStats.topicStats(str).remoteDeleteRequestRate().mark();
                RemoteLogManager.this.brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().mark();
                try {
                    RemoteLogManager.this.remoteLogStorageManager.deleteLogSegmentData(remoteLogSegmentMetadata);
                    RemoteLogManager.this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(remoteLogSegmentMetadata.remoteLogSegmentId(), RemoteLogManager.this.time.milliseconds(), remoteLogSegmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, RemoteLogManager.this.brokerId)).get();
                    RLMTask.this.logger.debug("Deleted remote log segment {}", remoteLogSegmentMetadata.remoteLogSegmentId());
                    return true;
                } catch (RemoteStorageException e) {
                    RemoteLogManager.this.brokerTopicStats.topicStats(str).failedRemoteDeleteRequestRate().mark();
                    RemoteLogManager.this.brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().mark();
                    throw e;
                }
            }
        }

        public RLMTask(TopicIdPartition topicIdPartition, int i) {
            super();
            this.leaderEpoch = -1;
            this.copiedOffsetOption = Optional.empty();
            this.isLogStartOffsetUpdatedOnBecomingLeader = false;
            this.logDirectory = Optional.empty();
            this.topicIdPartition = topicIdPartition;
            this.customMetadataSizeLimit = i;
            this.logger = new LogContext("[RemoteLogManager=" + RemoteLogManager.this.brokerId + " partition=" + topicIdPartition + "] ").logger(RLMTask.class);
        }

        boolean isLeader() {
            return this.leaderEpoch >= 0;
        }

        public void convertToLeader(int i) {
            if (i < 0) {
                throw new KafkaException("leaderEpoch value for topic partition " + this.topicIdPartition + " can not be negative");
            }
            if (this.leaderEpoch != i) {
                this.leaderEpoch = i;
            }
            this.copiedOffsetOption = Optional.empty();
            this.isLogStartOffsetUpdatedOnBecomingLeader = false;
        }

        public void convertToFollower() {
            this.leaderEpoch = -1;
        }

        private void maybeUpdateLogStartOffsetOnBecomingLeader(MergedLog mergedLog) throws RemoteStorageException {
            if (this.isLogStartOffsetUpdatedOnBecomingLeader) {
                return;
            }
            long findLogStartOffset = RemoteLogManager.this.findLogStartOffset(this.topicIdPartition, mergedLog);
            RemoteLogManager.this.updateRemoteLogStartOffset.accept(this.topicIdPartition.topicPartition(), Long.valueOf(findLogStartOffset));
            this.isLogStartOffsetUpdatedOnBecomingLeader = true;
            this.logger.info("Found the logStartOffset: {} for partition: {} after becoming leader, leaderEpoch: {}", new Object[]{Long.valueOf(findLogStartOffset), this.topicIdPartition, Integer.valueOf(this.leaderEpoch)});
        }

        private void maybeUpdateCopiedOffset(MergedLog mergedLog) throws RemoteStorageException {
            if (this.copiedOffsetOption.isPresent()) {
                return;
            }
            this.copiedOffsetOption = Optional.of(RemoteLogManager.this.findHighestRemoteOffset(this.topicIdPartition, mergedLog));
            this.logger.info("Found the highest copiedRemoteOffset: {} for partition: {} after becoming leader, leaderEpoch: {}", new Object[]{this.copiedOffsetOption, this.topicIdPartition, Integer.valueOf(this.leaderEpoch)});
            this.copiedOffsetOption.ifPresent(offsetAndEpoch -> {
                mergedLog.updateHighestOffsetInRemoteStorage(offsetAndEpoch.offset());
            });
        }

        List<EnrichedLogSegment> candidateLogSegments(MergedLog mergedLog, Long l, Long l2) {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList(mergedLog.localLogSegments(l.longValue(), TierObjectMetadata.DEFAULT_STATE_CHANGE_TIMESTAMP));
            if (!arrayList2.isEmpty()) {
                for (int i = 1; i < arrayList2.size(); i++) {
                    LogSegment logSegment = (LogSegment) arrayList2.get(i - 1);
                    LogSegment logSegment2 = (LogSegment) arrayList2.get(i);
                    if (logSegment2.baseOffset() <= l2.longValue()) {
                        arrayList.add(new EnrichedLogSegment(logSegment, logSegment2.baseOffset()));
                    }
                }
            }
            return arrayList;
        }

        public void copyLogSegmentsToRemote(MergedLog mergedLog) throws InterruptedException {
            if (isCancelled()) {
                return;
            }
            try {
                try {
                    maybeUpdateLogStartOffsetOnBecomingLeader(mergedLog);
                    maybeUpdateCopiedOffset(mergedLog);
                    long offset = this.copiedOffsetOption.get().offset();
                    long lastStableOffset = mergedLog.lastStableOffset();
                    if (lastStableOffset < 0) {
                        this.logger.warn("lastStableOffset for partition {} is {}, which should not be negative.", this.topicIdPartition, Long.valueOf(lastStableOffset));
                    } else if (lastStableOffset <= 0 || offset >= lastStableOffset) {
                        this.logger.debug("Skipping copying segments, current read-offset:{}, and LSO:{}", Long.valueOf(offset), Long.valueOf(lastStableOffset));
                    } else {
                        long max = Math.max(offset + 1, mergedLog.logStartOffset());
                        List<EnrichedLogSegment> candidateLogSegments = candidateLogSegments(mergedLog, Long.valueOf(max), Long.valueOf(lastStableOffset));
                        this.logger.debug("Candidate log segments, logStartOffset: {}, copiedOffset: {}, fromOffset: {}, lso: {} and candidateLogSegments: {}", new Object[]{Long.valueOf(mergedLog.logStartOffset()), Long.valueOf(offset), Long.valueOf(max), Long.valueOf(lastStableOffset), candidateLogSegments});
                        if (candidateLogSegments.isEmpty()) {
                            this.logger.debug("No segments found to be copied for partition {} with copiedOffset: {} and active segment's base-offset: {}", new Object[]{this.topicIdPartition, Long.valueOf(offset), Long.valueOf(mergedLog.activeSegment().baseOffset())});
                        } else {
                            for (EnrichedLogSegment enrichedLogSegment : candidateLogSegments) {
                                if (isCancelled() || !isLeader()) {
                                    this.logger.info("Skipping copying log segments as the current task state is changed, cancelled: {} leader:{}", Boolean.valueOf(isCancelled()), Boolean.valueOf(isLeader()));
                                    return;
                                }
                                RemoteLogManager.this.copyQuotaManagerLock.lock();
                                while (RemoteLogManager.this.rlmCopyQuotaManager.isQuotaExceeded()) {
                                    try {
                                        this.logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available.");
                                        RemoteLogManager.this.copyQuotaManagerLockCondition.await(RemoteLogManager.this.quotaTimeout().toMillis(), TimeUnit.MILLISECONDS);
                                    } catch (Throwable th) {
                                        RemoteLogManager.this.copyQuotaManagerLock.unlock();
                                        throw th;
                                    }
                                }
                                RemoteLogManager.this.rlmCopyQuotaManager.record(enrichedLogSegment.logSegment.log().sizeInBytes());
                                RemoteLogManager.this.copyQuotaManagerLockCondition.signalAll();
                                RemoteLogManager.this.copyQuotaManagerLock.unlock();
                                copyLogSegment(mergedLog, enrichedLogSegment.logSegment, enrichedLogSegment.nextSegmentOffset);
                            }
                        }
                    }
                } catch (InterruptedException | RetriableException e) {
                    throw e;
                }
            } catch (CustomMetadataSizeLimitExceededException e2) {
                RemoteLogManager.this.brokerTopicStats.topicStats(mergedLog.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
                RemoteLogManager.this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
                cancel();
            } catch (Exception e3) {
                if (isCancelled()) {
                    return;
                }
                RemoteLogManager.this.brokerTopicStats.topicStats(mergedLog.topicPartition().topic()).failedRemoteCopyRequestRate().mark();
                RemoteLogManager.this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
                this.logger.error("Error occurred while copying log segments of partition: {}", this.topicIdPartition, e3);
            }
        }

        private void copyLogSegment(MergedLog mergedLog, LogSegment logSegment, long j) throws InterruptedException, ExecutionException, RemoteStorageException, IOException, CustomMetadataSizeLimitExceededException {
            File file = logSegment.log().file();
            String name = file.getName();
            this.logger.info("Copying {} to remote storage.", name);
            RemoteLogSegmentId generateNew = RemoteLogSegmentId.generateNew(this.topicIdPartition);
            long j2 = j - 1;
            File file2 = (File) mergedLog.producerStateManager().fetchSnapshot(j).orElse(null);
            List<EpochEntry> leaderEpochEntries = RemoteLogManager.this.getLeaderEpochEntries(mergedLog, logSegment.baseOffset(), j);
            HashMap hashMap = new HashMap(leaderEpochEntries.size());
            leaderEpochEntries.forEach(epochEntry -> {
            });
            RemoteLogSegmentMetadata remoteLogSegmentMetadata = new RemoteLogSegmentMetadata(generateNew, logSegment.baseOffset(), j2, logSegment.largestTimestamp(), RemoteLogManager.this.brokerId, RemoteLogManager.this.time.milliseconds(), logSegment.log().sizeInBytes(), hashMap);
            RemoteLogManager.this.remoteLogMetadataManager.addRemoteLogSegmentMetadata(remoteLogSegmentMetadata).get();
            LogSegmentData logSegmentData = new LogSegmentData(file.toPath(), toPathIfExists(logSegment.offsetIndex().file()), toPathIfExists(logSegment.timeIndex().file()), Optional.ofNullable(toPathIfExists(logSegment.txnIndex().file())), file2.toPath(), RemoteLogManager.epochEntriesAsByteBuffer(RemoteLogManager.this.getLeaderEpochEntries(mergedLog, -1L, j)));
            RemoteLogManager.this.brokerTopicStats.topicStats(mergedLog.topicPartition().topic()).remoteCopyRequestRate().mark();
            RemoteLogManager.this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
            Optional.empty();
            try {
                Optional copyLogSegmentData = RemoteLogManager.this.remoteLogStorageManager.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData);
                RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(generateNew, RemoteLogManager.this.time.milliseconds(), copyLogSegmentData, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, RemoteLogManager.this.brokerId);
                if (copyLogSegmentData.isPresent()) {
                    long length = ((RemoteLogSegmentMetadata.CustomMetadata) copyLogSegmentData.get()).value().length;
                    if (length > this.customMetadataSizeLimit) {
                        CustomMetadataSizeLimitExceededException customMetadataSizeLimitExceededException = new CustomMetadataSizeLimitExceededException();
                        this.logger.error("Custom metadata size {} exceeds configured limit {}. Copying will be stopped and copied segment will be attempted to clean. Original metadata: {}", new Object[]{Long.valueOf(length), Integer.valueOf(this.customMetadataSizeLimit), remoteLogSegmentMetadata, customMetadataSizeLimitExceededException});
                        try {
                            RemoteLogManager.this.remoteLogStorageManager.deleteLogSegmentData(remoteLogSegmentMetadata.createWithUpdates(remoteLogSegmentMetadataUpdate));
                            this.logger.info("Successfully cleaned segment after custom metadata size exceeded");
                        } catch (RemoteStorageException e) {
                            this.logger.error("Error while cleaning segment after custom metadata size exceeded, consider cleaning manually", e);
                        }
                        throw customMetadataSizeLimitExceededException;
                    }
                }
                RemoteLogManager.this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate).get();
                RemoteLogManager.this.brokerTopicStats.topicStats(mergedLog.topicPartition().topic()).remoteCopyBytesRate().mark(remoteLogSegmentMetadata.segmentSizeInBytes());
                RemoteLogManager.this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().mark(remoteLogSegmentMetadata.segmentSizeInBytes());
                this.copiedOffsetOption = Optional.of(new OffsetAndEpoch(j2, leaderEpochEntries.get(leaderEpochEntries.size() - 1).epoch));
                mergedLog.updateHighestOffsetInRemoteStorage(j2);
                this.logger.info("Copied {} to remote storage with segment-id: {}", name, remoteLogSegmentMetadataUpdate.remoteLogSegmentId());
                recordLagStats(mergedLog.onlyLocalLogSegmentsSize() - mergedLog.activeSegment().size(), -1L);
            } catch (RemoteStorageException e2) {
                try {
                    RemoteLogManager.this.remoteLogStorageManager.deleteLogSegmentData(remoteLogSegmentMetadata);
                    this.logger.info("Successfully cleaned segment {} after failing to copy segment", generateNew);
                } catch (RemoteStorageException e3) {
                    this.logger.error("Error while cleaning segment {}, consider cleaning manually", generateNew, e3);
                }
                throw e2;
            }
        }

        void recordLagStats(long j, long j2) {
            if (isLeader()) {
                String str = this.topicIdPartition.topic();
                int partition = this.topicIdPartition.partition();
                RemoteLogManager.this.brokerTopicStats.recordRemoteCopyLagBytes(str, partition, j);
                RemoteLogManager.this.brokerTopicStats.recordRemoteCopyLagSegments(str, partition, j2);
            }
        }

        private Path toPathIfExists(File file) {
            if (file.exists()) {
                return file.toPath();
            }
            return null;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (isCancelled()) {
                return;
            }
            try {
                Optional optional = (Optional) RemoteLogManager.this.fetchLog.apply(this.topicIdPartition.topicPartition());
                if (optional.isPresent()) {
                    MergedLog mergedLog = (MergedLog) optional.get();
                    if (!mergedLog.parentDir().equals(this.logDirectory.orElse(null))) {
                        this.copiedOffsetOption = Optional.empty();
                        this.isLogStartOffsetUpdatedOnBecomingLeader = false;
                        this.logDirectory = Optional.of(mergedLog.parentDir());
                    }
                    if (isLeader()) {
                        copyLogSegmentsToRemote((MergedLog) optional.get());
                        cleanupExpiredRemoteLogSegments();
                    } else {
                        mergedLog.updateHighestOffsetInRemoteStorage(RemoteLogManager.this.findHighestRemoteOffset(this.topicIdPartition, mergedLog).offset());
                    }
                }
            } catch (InterruptedException e) {
                if (isCancelled()) {
                    return;
                }
                this.logger.warn("Current thread for topic-partition-id {} is interrupted", this.topicIdPartition, e);
            } catch (RetriableException e2) {
                this.logger.debug("Encountered a retryable error while executing current task for topic-partition {}", this.topicIdPartition, e2);
            } catch (Exception e3) {
                if (isCancelled()) {
                    return;
                }
                this.logger.warn("Current task for topic-partition {} received error but it will be scheduled", this.topicIdPartition, e3);
            }
        }

        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long j) {
            if (isLeader()) {
                this.logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, Long.valueOf(j));
                RemoteLogManager.this.updateRemoteLogStartOffset.accept(topicPartition, Long.valueOf(j));
            }
        }

        private void updateMetadataCountAndLogSizeWith(int i, long j) {
            int partition = this.topicIdPartition.partition();
            String str = this.topicIdPartition.topic();
            RemoteLogManager.this.brokerTopicStats.recordRemoteLogMetadataCount(str, partition, i);
            RemoteLogManager.this.brokerTopicStats.recordRemoteLogSizeBytes(str, partition, j);
        }

        private void updateRemoteDeleteLagWith(int i, long j) {
            String str = this.topicIdPartition.topic();
            int partition = this.topicIdPartition.partition();
            RemoteLogManager.this.brokerTopicStats.recordRemoteDeleteLagSegments(str, partition, i);
            RemoteLogManager.this.brokerTopicStats.recordRemoteDeleteLagBytes(str, partition, j);
        }

        void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
            long j;
            if (isCancelled() || !isLeader()) {
                this.logger.info("Returning from remote log segments cleanup as the task state is changed");
                return;
            }
            Optional optional = (Optional) RemoteLogManager.this.fetchLog.apply(this.topicIdPartition.topicPartition());
            if (!optional.isPresent()) {
                this.logger.debug("No UnifiedLog instance available for partition: {}", this.topicIdPartition);
                return;
            }
            MergedLog mergedLog = (MergedLog) optional.get();
            Optional<LeaderEpochFileCache> leaderEpochCache = mergedLog.leaderEpochCache();
            if (!leaderEpochCache.isPresent()) {
                this.logger.debug("No leader epoch cache available for partition: {}", this.topicIdPartition);
                return;
            }
            Iterator listRemoteLogSegments = RemoteLogManager.this.remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition);
            if (!listRemoteLogSegments.hasNext()) {
                updateMetadataCountAndLogSizeWith(0, 0L);
                this.logger.debug("No remote log segments available on remote storage for partition: {}", this.topicIdPartition);
                return;
            }
            HashSet hashSet = new HashSet();
            int i = 0;
            long j2 = 0;
            while (true) {
                j = j2;
                if (!listRemoteLogSegments.hasNext()) {
                    break;
                }
                hashSet.addAll(((RemoteLogSegmentMetadata) listRemoteLogSegments.next()).segmentLeaderEpochs().keySet());
                i++;
                j2 = j + r0.segmentSizeInBytes();
            }
            updateMetadataCountAndLogSizeWith(i, j);
            ArrayList arrayList = new ArrayList(hashSet);
            Collections.sort(arrayList);
            LeaderEpochFileCache leaderEpochFileCache = leaderEpochCache.get();
            NavigableMap<Integer, Long> buildFilteredLeaderEpochMap = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochFileCache.epochWithOffsets());
            long logStartOffset = mergedLog.logStartOffset();
            long logEndOffset = mergedLog.logEndOffset();
            RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(buildRetentionSizeData(mergedLog.config().retentionSize, mergedLog.onlyLocalLogSegmentsSize(), logEndOffset, buildFilteredLeaderEpochMap), buildRetentionTimeData(mergedLog.config().retentionMs));
            Iterator<Integer> it = buildFilteredLeaderEpochMap.navigableKeySet().iterator();
            boolean z = true;
            ArrayList<RemoteLogSegmentMetadata> arrayList2 = new ArrayList();
            long j3 = 0;
            while (z && it.hasNext()) {
                Iterator listRemoteLogSegments2 = RemoteLogManager.this.remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, it.next().intValue());
                while (z && listRemoteLogSegments2.hasNext()) {
                    if (isCancelled() || !isLeader()) {
                        this.logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed.");
                        return;
                    }
                    RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) listRemoteLogSegments2.next();
                    if (!RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(remoteLogSegmentMetadata.state()) && !arrayList2.contains(remoteLogSegmentMetadata)) {
                        boolean isSegmentBreachByLogStartOffset = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(remoteLogSegmentMetadata, logStartOffset, buildFilteredLeaderEpochMap);
                        boolean z2 = false;
                        if (!isSegmentBreachByLogStartOffset) {
                            z2 = RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(remoteLogSegmentMetadata, logEndOffset, buildFilteredLeaderEpochMap);
                            if (z2) {
                                isSegmentBreachByLogStartOffset = remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(remoteLogSegmentMetadata) || remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(remoteLogSegmentMetadata);
                            }
                        }
                        if (isSegmentBreachByLogStartOffset) {
                            arrayList2.add(remoteLogSegmentMetadata);
                            j3 += remoteLogSegmentMetadata.segmentSizeInBytes();
                        }
                        z = isSegmentBreachByLogStartOffset || !z2;
                    }
                }
            }
            remoteLogRetentionHandler.logStartOffset.ifPresent(j4 -> {
                handleLogStartOffsetUpdate(this.topicIdPartition.topicPartition(), j4);
            });
            int size = arrayList2.size();
            updateRemoteDeleteLagWith(size, j3);
            ArrayList arrayList3 = new ArrayList();
            for (RemoteLogSegmentMetadata remoteLogSegmentMetadata2 : arrayList2) {
                if (remoteLogRetentionHandler.deleteRemoteLogSegment(remoteLogSegmentMetadata2, remoteLogSegmentMetadata3 -> {
                    return !isCancelled() && isLeader();
                })) {
                    j3 -= remoteLogSegmentMetadata2.segmentSizeInBytes();
                    size--;
                    updateRemoteDeleteLagWith(size, j3);
                } else {
                    arrayList3.add(remoteLogSegmentMetadata2.remoteLogSegmentId().toString());
                }
            }
            if (!arrayList3.isEmpty()) {
                this.logger.info("The following remote segments could not be deleted: {}", String.join(",", arrayList3));
            }
            Optional earliestEntry = leaderEpochFileCache.earliestEntry();
            if (earliestEntry.isPresent()) {
                EpochEntry epochEntry = (EpochEntry) earliestEntry.get();
                Iterator it2 = arrayList.stream().filter(num -> {
                    return num.intValue() < epochEntry.epoch;
                }).iterator();
                ArrayList<RemoteLogSegmentMetadata> arrayList4 = new ArrayList();
                while (it2.hasNext()) {
                    Iterator listRemoteLogSegments3 = RemoteLogManager.this.remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, ((Integer) it2.next()).intValue());
                    while (listRemoteLogSegments3.hasNext()) {
                        if (!isCancelled() && isLeader()) {
                            j3 += r0.segmentSizeInBytes();
                            arrayList4.add((RemoteLogSegmentMetadata) listRemoteLogSegments3.next());
                        }
                    }
                }
                int size2 = size + arrayList4.size();
                updateRemoteDeleteLagWith(size2, j3);
                for (RemoteLogSegmentMetadata remoteLogSegmentMetadata4 : arrayList4) {
                    if (!isCancelled() && isLeader() && remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(epochEntry, remoteLogSegmentMetadata4)) {
                        j3 -= remoteLogSegmentMetadata4.segmentSizeInBytes();
                        size2--;
                        updateRemoteDeleteLagWith(size2, j3);
                    }
                }
            }
        }

        private Optional<RetentionTimeData> buildRetentionTimeData(long j) {
            return j > -1 ? Optional.of(new RetentionTimeData(j, RemoteLogManager.this.time.milliseconds() - j)) : Optional.empty();
        }

        private Optional<RetentionSizeData> buildRetentionSizeData(long j, long j2, long j3, NavigableMap<Integer, Long> navigableMap) throws RemoteStorageException {
            if (j > -1) {
                long milliseconds = RemoteLogManager.this.time.milliseconds();
                long j4 = 0;
                HashSet hashSet = new HashSet();
                Iterator<Integer> it = navigableMap.navigableKeySet().iterator();
                while (it.hasNext()) {
                    Iterator listRemoteLogSegments = RemoteLogManager.this.remoteLogMetadataManager.listRemoteLogSegments(this.topicIdPartition, it.next().intValue());
                    while (listRemoteLogSegments.hasNext()) {
                        RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) listRemoteLogSegments.next();
                        if (remoteLogSegmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED) || remoteLogSegmentMetadata.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_STARTED)) {
                            RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
                            if (!hashSet.contains(remoteLogSegmentId) && RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(remoteLogSegmentMetadata, j3, navigableMap)) {
                                j4 += remoteLogSegmentMetadata.segmentSizeInBytes();
                                hashSet.add(remoteLogSegmentId);
                            }
                        }
                    }
                }
                RemoteLogManager.this.brokerTopicStats.recordRemoteLogSizeComputationTime(this.topicIdPartition.topic(), this.topicIdPartition.partition(), RemoteLogManager.this.time.milliseconds() - milliseconds);
                long j5 = j2 + j4;
                if (j5 > j) {
                    return Optional.of(new RetentionSizeData(j, j5 - j));
                }
            }
            return Optional.empty();
        }

        public String toString() {
            return getClass() + "[" + this.topicIdPartition + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/log/remote/RemoteLogManager$RLMTaskWithFuture.class */
    public static class RLMTaskWithFuture {
        private final RLMTask rlmTask;
        private final Future<?> future;

        RLMTaskWithFuture(RLMTask rLMTask, Future<?> future) {
            this.rlmTask = rLMTask;
            this.future = future;
        }

        public void cancel() {
            this.rlmTask.cancel();
            try {
                this.future.cancel(true);
            } catch (Exception e) {
                RemoteLogManager.LOGGER.error("Error occurred while canceling the task: {}", this.rlmTask, e);
            }
        }
    }

    /* loaded from: input_file:kafka/log/remote/RemoteLogManager$RetentionSizeData.class */
    public static class RetentionSizeData {
        private final long retentionSize;
        private final long remainingBreachedSize;

        public RetentionSizeData(long j, long j2) {
            if (j < 0) {
                throw new IllegalArgumentException("retentionSize should be non negative, but it is " + j);
            }
            if (j2 <= 0) {
                throw new IllegalArgumentException("remainingBreachedSize should be more than zero, but it is " + j2);
            }
            this.retentionSize = j;
            this.remainingBreachedSize = j2;
        }
    }

    /* loaded from: input_file:kafka/log/remote/RemoteLogManager$RetentionTimeData.class */
    public static class RetentionTimeData {
        private final long retentionMs;
        private final long cleanupUntilMs;

        public RetentionTimeData(long j, long j2) {
            if (j < 0) {
                throw new IllegalArgumentException("retentionMs should be non negative, but it is " + j);
            }
            if (j2 < 0) {
                throw new IllegalArgumentException("cleanupUntilMs should be non negative, but it is " + j2);
            }
            this.retentionMs = j;
            this.cleanupUntilMs = j2;
        }
    }

    public RemoteLogManager(KafkaConfig kafkaConfig, int i, String str, String str2, Time time, Function<TopicPartition, Optional<MergedLog>> function, BiConsumer<TopicPartition, Long> biConsumer, BrokerTopicStats brokerTopicStats, Metrics metrics) throws IOException {
        this.config = kafkaConfig;
        this.brokerId = i;
        this.logDir = str;
        this.clusterId = str2;
        this.time = time;
        this.fetchLog = function;
        this.updateRemoteLogStartOffset = biConsumer;
        this.brokerTopicStats = brokerTopicStats;
        this.metrics = metrics;
        RemoteLogManagerConfig remoteLogManagerConfig = kafkaConfig.remoteLogManagerConfig();
        this.indexCache = new RemoteIndexCache(kafkaConfig.remoteLogIndexFileCacheTotalSizeBytes(), this.remoteLogStorageManager, str);
        this.delayInMs = remoteLogManagerConfig.remoteLogManagerTaskIntervalMs();
        this.rlmScheduledThreadPool = new RLMScheduledThreadPool(remoteLogManagerConfig.remoteLogManagerThreadPoolSize());
        this.metricsGroup.newGauge(RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, new Gauge<Double>() { // from class: kafka.log.remote.RemoteLogManager.1
            /* renamed from: value, reason: merged with bridge method [inline-methods] */
            public Double m824value() {
                return RemoteLogManager.this.rlmScheduledThreadPool.getIdlePercent();
            }
        });
        this.remoteReadTimer = this.metricsGroup.newTimer(RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
        this.remoteStorageReaderThreadPool = new RemoteStorageThreadPool(REMOTE_LOG_READER_THREAD_NAME_PREFIX, remoteLogManagerConfig.remoteLogReaderThreads(), remoteLogManagerConfig.remoteLogReaderMaxPendingTasks());
    }

    public void resizeCacheSize(long j) {
        this.indexCache.resizeCacheSize(j);
    }

    public void updateCopyQuota(long j) {
        LOGGER.info("Updating remote copy quota to {} bytes per second", Long.valueOf(j));
        this.rlmCopyQuotaManager.updateQuota(new Quota(j, true));
    }

    public void updateFetchQuota(long j) {
        LOGGER.info("Updating remote fetch quota to {} bytes per second", Long.valueOf(j));
        this.rlmFetchQuotaManager.updateQuota(new Quota(j, true));
    }

    private void removeMetrics() {
        this.metricsGroup.removeMetric(RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
        this.metricsGroup.removeMetric(RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
        this.remoteStorageReaderThreadPool.removeMetrics();
    }

    Duration quotaTimeout() {
        return Duration.ofSeconds(1L);
    }

    RLMQuotaManager createRLMCopyQuotaManager() {
        return new RLMQuotaManager(copyQuotaManagerConfig(this.config), this.metrics, QuotaType$RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", this.time);
    }

    RLMQuotaManager createRLMFetchQuotaManager() {
        return new RLMQuotaManager(fetchQuotaManagerConfig(this.config), this.metrics, QuotaType$RLMFetch$.MODULE$, "Tracking fetch byte-rate for Remote Log Manager", this.time);
    }

    public boolean isRemoteLogFetchQuotaExceeded() {
        return this.rlmFetchQuotaManager.isQuotaExceeded();
    }

    static RLMQuotaManagerConfig copyQuotaManagerConfig(KafkaConfig kafkaConfig) {
        RemoteLogManagerConfig remoteLogManagerConfig = kafkaConfig.remoteLogManagerConfig();
        return new RLMQuotaManagerConfig(kafkaConfig.remoteLogManagerCopyMaxBytesPerSecond(), remoteLogManagerConfig.remoteLogManagerCopyNumQuotaSamples(), remoteLogManagerConfig.remoteLogManagerCopyQuotaWindowSizeSeconds());
    }

    static RLMQuotaManagerConfig fetchQuotaManagerConfig(KafkaConfig kafkaConfig) {
        RemoteLogManagerConfig remoteLogManagerConfig = kafkaConfig.remoteLogManagerConfig();
        return new RLMQuotaManagerConfig(kafkaConfig.remoteLogManagerFetchMaxBytesPerSecond(), remoteLogManagerConfig.remoteLogManagerFetchNumQuotaSamples(), remoteLogManagerConfig.remoteLogManagerFetchQuotaWindowSizeSeconds());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T createDelegate(ClassLoader classLoader, String str) {
        try {
            return (T) classLoader.loadClass(str).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new KafkaException(e);
        }
    }

    RemoteStorageManager createRemoteStorageManager() {
        final RemoteLogManagerConfig remoteLogManagerConfig = this.config.remoteLogManagerConfig();
        return (RemoteStorageManager) AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>() { // from class: kafka.log.remote.RemoteLogManager.2
            private final String classPath;

            {
                this.classPath = remoteLogManagerConfig.remoteStorageManagerClassPath();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedAction
            public RemoteStorageManager run() {
                if (this.classPath == null || this.classPath.trim().isEmpty()) {
                    return (RemoteStorageManager) RemoteLogManager.this.createDelegate(getClass().getClassLoader(), remoteLogManagerConfig.remoteStorageManagerClassName());
                }
                ChildFirstClassLoader childFirstClassLoader = new ChildFirstClassLoader(this.classPath, getClass().getClassLoader());
                return new ClassLoaderAwareRemoteStorageManager((RemoteStorageManager) RemoteLogManager.this.createDelegate(childFirstClassLoader, remoteLogManagerConfig.remoteStorageManagerClassName()), childFirstClassLoader);
            }
        });
    }

    private void configureRSM() {
        HashMap hashMap = new HashMap(this.config.remoteLogManagerConfig().remoteStorageManagerProps());
        hashMap.put("broker.id", Integer.valueOf(this.brokerId));
        this.remoteLogStorageManager.configure(hashMap);
    }

    RemoteLogMetadataManager createRemoteLogMetadataManager() {
        final RemoteLogManagerConfig remoteLogManagerConfig = this.config.remoteLogManagerConfig();
        return (RemoteLogMetadataManager) AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>() { // from class: kafka.log.remote.RemoteLogManager.3
            private final String classPath;

            {
                this.classPath = remoteLogManagerConfig.remoteLogMetadataManagerClassPath();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.security.PrivilegedAction
            public RemoteLogMetadataManager run() {
                if (this.classPath == null || this.classPath.trim().isEmpty()) {
                    return (RemoteLogMetadataManager) RemoteLogManager.this.createDelegate(getClass().getClassLoader(), remoteLogManagerConfig.remoteLogMetadataManagerClassName());
                }
                ChildFirstClassLoader childFirstClassLoader = new ChildFirstClassLoader(this.classPath, getClass().getClassLoader());
                return new ClassLoaderAwareRemoteLogMetadataManager((RemoteLogMetadataManager) RemoteLogManager.this.createDelegate(childFirstClassLoader, remoteLogManagerConfig.remoteLogMetadataManagerClassName()), childFirstClassLoader);
            }
        });
    }

    public void onEndPointCreated(EndPoint endPoint) {
        this.endpoint = Optional.of(endPoint);
    }

    private void configureRLMM() {
        HashMap hashMap = new HashMap();
        this.endpoint.ifPresent(endPoint -> {
            hashMap.put("remote.log.metadata.common.client.bootstrap.servers", endPoint.host() + ":" + endPoint.port());
            hashMap.put("remote.log.metadata.common.client.security.protocol", endPoint.securityProtocol().name);
        });
        hashMap.putAll(this.config.remoteLogManagerConfig().remoteLogMetadataManagerProps());
        hashMap.put("broker.id", Integer.valueOf(this.brokerId));
        hashMap.put("log.dir", this.logDir);
        hashMap.put("cluster.id", this.clusterId);
        this.remoteLogMetadataManager.configure(hashMap);
    }

    public void startup() {
        configureRSM();
        configureRLMM();
        this.remoteLogManagerConfigured = true;
    }

    private boolean isRemoteLogManagerConfigured() {
        return this.remoteLogManagerConfigured;
    }

    public RemoteStorageManager storageManager() {
        return this.remoteLogStorageManager;
    }

    private Stream<Partition> filterPartitions(Set<Partition> set) {
        return set.stream().filter(partition -> {
            return partition.log().exists((v0) -> {
                return v0.remoteLogEnabled();
            });
        });
    }

    private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
        Uuid put = this.topicIdByPartitionMap.put(topicIdPartition.topicPartition(), topicIdPartition.topicId());
        if (put == null || put.equals(topicIdPartition.topicId())) {
            return;
        }
        LOGGER.info("Previous cached topic id {} for {} does not match updated topic id {}", new Object[]{put, topicIdPartition.topicPartition(), topicIdPartition.topicId()});
    }

    public void onLeadershipChange(Set<Partition> set, Set<Partition> set2, Map<String, Uuid> map) {
        LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", set, set2);
        if (this.config.remoteLogManagerConfig().isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) {
            throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled");
        }
        Map map2 = (Map) filterPartitions(set).collect(Collectors.toMap(partition -> {
            return new TopicIdPartition((Uuid) map.get(partition.topic()), partition.topicPartition());
        }, (v0) -> {
            return v0.getLeaderEpoch();
        }));
        Set keySet = map2.keySet();
        Set set3 = (Set) filterPartitions(set2).map(partition2 -> {
            return new TopicIdPartition((Uuid) map.get(partition2.topic()), partition2.topicPartition());
        }).collect(Collectors.toSet());
        if (keySet.isEmpty() && set3.isEmpty()) {
            return;
        }
        LOGGER.debug("Effective topic partitions after filtering compact and internal topics, leaders: {} and followers: {}", keySet, set3);
        keySet.forEach(this::cacheTopicPartitionIds);
        set3.forEach(this::cacheTopicPartitionIds);
        this.remoteLogMetadataManager.onPartitionLeadershipChanges(keySet, set3);
        set3.forEach(topicIdPartition -> {
            doHandleLeaderOrFollowerPartitions(topicIdPartition, (v0) -> {
                v0.convertToFollower();
            });
        });
        set3.forEach(this::removeRemoteTopicPartitionMetrics);
        map2.forEach((topicIdPartition2, num) -> {
            doHandleLeaderOrFollowerPartitions(topicIdPartition2, rLMTask -> {
                rLMTask.convertToLeader(num.intValue());
            });
        });
    }

    public void stopPartitions(Set<StopPartition> set, BiConsumer<TopicPartition, Throwable> biConsumer) {
        LOGGER.debug("Stop partitions: {}", set);
        for (StopPartition stopPartition : set) {
            TopicPartition topicPartition = stopPartition.topicPartition();
            try {
                if (this.topicIdByPartitionMap.containsKey(topicPartition)) {
                    TopicIdPartition topicIdPartition = new TopicIdPartition(this.topicIdByPartitionMap.get(topicPartition), topicPartition);
                    RLMTaskWithFuture remove = this.leaderOrFollowerTasks.remove(topicIdPartition);
                    if (remove != null) {
                        LOGGER.info("Cancelling the RLM task for tpId: {}", topicIdPartition);
                        remove.cancel();
                    }
                    removeRemoteTopicPartitionMetrics(topicIdPartition);
                    if (stopPartition.deleteRemoteLog()) {
                        LOGGER.info("Deleting the remote log segments task for partition: {}", topicIdPartition);
                        deleteRemoteLogPartition(topicIdPartition);
                    }
                } else {
                    LOGGER.warn("StopPartition call is not expected for partition: {}", topicPartition);
                }
            } catch (Exception e) {
                biConsumer.accept(topicPartition, e);
                LOGGER.error("Error while stopping the partition: {}", stopPartition, e);
            }
        }
        Set set2 = (Set) set.stream().filter(stopPartition2 -> {
            return stopPartition2.deleteLocalLog() && this.topicIdByPartitionMap.containsKey(stopPartition2.topicPartition());
        }).map(stopPartition3 -> {
            return new TopicIdPartition(this.topicIdByPartitionMap.get(stopPartition3.topicPartition()), stopPartition3.topicPartition());
        }).collect(Collectors.toSet());
        if (set2.isEmpty()) {
            return;
        }
        this.remoteLogMetadataManager.onStopPartitions(set2);
        set2.forEach(topicIdPartition2 -> {
            this.topicIdByPartitionMap.remove(topicIdPartition2.topicPartition());
        });
    }

    private void deleteRemoteLogPartition(TopicIdPartition topicIdPartition) throws RemoteStorageException, ExecutionException, InterruptedException {
        ArrayList<RemoteLogSegmentMetadata> arrayList = new ArrayList();
        Iterator listRemoteLogSegments = this.remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
        arrayList.getClass();
        listRemoteLogSegments.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        publishEvents((List) arrayList.stream().map(remoteLogSegmentMetadata -> {
            return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentMetadata.remoteLogSegmentId(), this.time.milliseconds(), remoteLogSegmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, this.brokerId);
        }).collect(Collectors.toList())).get();
        ArrayList arrayList2 = new ArrayList();
        for (RemoteLogSegmentMetadata remoteLogSegmentMetadata2 : arrayList) {
            arrayList2.add(remoteLogSegmentMetadata2.remoteLogSegmentId().id());
            this.remoteLogStorageManager.deleteLogSegmentData(remoteLogSegmentMetadata2);
        }
        this.indexCache.removeAll(arrayList2);
        publishEvents((List) arrayList.stream().map(remoteLogSegmentMetadata3 -> {
            return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentMetadata3.remoteLogSegmentId(), this.time.milliseconds(), remoteLogSegmentMetadata3.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, this.brokerId);
        }).collect(Collectors.toList())).get();
    }

    private CompletableFuture<Void> publishEvents(List<RemoteLogSegmentMetadataUpdate> list) throws RemoteStorageException {
        ArrayList arrayList = new ArrayList();
        Iterator<RemoteLogSegmentMetadataUpdate> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata(it.next()));
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0]));
    }

    public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, int i, long j) throws RemoteStorageException {
        Uuid uuid = this.topicIdByPartitionMap.get(topicPartition);
        if (uuid == null) {
            throw new KafkaException("No topic id registered for topic partition: " + topicPartition);
        }
        return this.remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(uuid, topicPartition), i, j);
    }

    /* JADX WARN: Code restructure failed: missing block: B:19:0x008f, code lost:
    
        r0 = java.util.Optional.of(new org.apache.kafka.common.record.FileRecords.FileTimestampAndOffset(r0.timestamp(), r0.offset(), maybeLeaderEpoch(r0.partitionLeaderEpoch())));
     */
    /* JADX WARN: Code restructure failed: missing block: B:20:0x00b6, code lost:
    
        if (r0 == null) goto L26;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x00bb, code lost:
    
        if (0 == 0) goto L25;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x00d4, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x00be, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x00c8, code lost:
    
        r23 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x00ca, code lost:
    
        r20.addSuppressed(r23);
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x00ea, code lost:
    
        if (r0 == null) goto L74;
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x00ef, code lost:
    
        if (0 == 0) goto L37;
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x0108, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:46:0x00f2, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x00fc, code lost:
    
        r21 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:50:0x00fe, code lost:
    
        r20.addSuppressed(r21);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.util.Optional<org.apache.kafka.common.record.FileRecords.FileTimestampAndOffset> lookupTimestamp(org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata r10, long r11, long r13) throws org.apache.kafka.server.log.remote.storage.RemoteStorageException, java.io.IOException {
        /*
            Method dump skipped, instructions count: 357
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: kafka.log.remote.RemoteLogManager.lookupTimestamp(org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata, long, long):java.util.Optional");
    }

    private Optional<Integer> maybeLeaderEpoch(int i) {
        return i == -1 ? Optional.empty() : Optional.of(Integer.valueOf(i));
    }

    public Optional<FileRecords.FileTimestampAndOffset> findOffsetByTimestamp(TopicPartition topicPartition, long j, long j2, LeaderEpochFileCache leaderEpochFileCache) throws RemoteStorageException, IOException {
        Uuid uuid = this.topicIdByPartitionMap.get(topicPartition);
        if (uuid == null) {
            throw new KafkaException("Topic id does not exist for topic partition: " + topicPartition);
        }
        Optional<MergedLog> apply = this.fetchLog.apply(topicPartition);
        if (!apply.isPresent()) {
            throw new KafkaException("UnifiedLog does not exist for topic partition: " + topicPartition);
        }
        MergedLog mergedLog = apply.get();
        OptionalInt epochForOffset = leaderEpochFileCache.epochForOffset(j2);
        TopicIdPartition topicIdPartition = new TopicIdPartition(uuid, topicPartition);
        NavigableMap<Integer, Long> buildFilteredLeaderEpochMap = buildFilteredLeaderEpochMap(leaderEpochFileCache.epochWithOffsets());
        while (epochForOffset.isPresent()) {
            int asInt = epochForOffset.getAsInt();
            Iterator listRemoteLogSegments = this.remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, asInt);
            while (listRemoteLogSegments.hasNext()) {
                RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) listRemoteLogSegments.next();
                if (remoteLogSegmentMetadata.maxTimestampMs() >= j && remoteLogSegmentMetadata.endOffset() >= j2 && isRemoteSegmentWithinLeaderEpochs(remoteLogSegmentMetadata, mergedLog.logEndOffset(), buildFilteredLeaderEpochMap) && remoteLogSegmentMetadata.state().equals(RemoteLogSegmentState.COPY_SEGMENT_FINISHED)) {
                    return lookupTimestamp(remoteLogSegmentMetadata, j, j2);
                }
            }
            epochForOffset = leaderEpochFileCache.nextEpoch(asInt);
        }
        return Optional.empty();
    }

    List<EpochEntry> getLeaderEpochEntries(MergedLog mergedLog, long j, long j2) {
        return mergedLog.leaderEpochCache().isPresent() ? mergedLog.leaderEpochCache().get().epochEntriesInRange(j, j2) : Collections.emptyList();
    }

    RLMTask rlmTask(TopicIdPartition topicIdPartition) {
        RLMTaskWithFuture rLMTaskWithFuture = this.leaderOrFollowerTasks.get(topicIdPartition);
        if (rLMTaskWithFuture != null) {
            return rLMTaskWithFuture.rlmTask;
        }
        return null;
    }

    static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long j, NavigableMap<Integer, Long> navigableMap) {
        Map.Entry<Integer, Long> higherEntry;
        long endOffset = remoteLogSegmentMetadata.endOffset();
        NavigableMap<Integer, Long> buildFilteredLeaderEpochMap = buildFilteredLeaderEpochMap(remoteLogSegmentMetadata.segmentLeaderEpochs());
        Integer lastKey = buildFilteredLeaderEpochMap.lastKey();
        if (lastKey.intValue() < navigableMap.firstKey().intValue() || lastKey.intValue() > navigableMap.lastKey().intValue()) {
            LOGGER.debug("Segment {} is not within the partition leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), buildFilteredLeaderEpochMap, navigableMap});
            return false;
        }
        Integer ceilingKey = buildFilteredLeaderEpochMap.ceilingKey(navigableMap.firstKey());
        if (ceilingKey == null) {
            LOGGER.debug("Segment {} is not within the partition leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), buildFilteredLeaderEpochMap, navigableMap});
            return false;
        }
        for (Map.Entry<Integer, Long> entry : buildFilteredLeaderEpochMap.entrySet()) {
            int intValue = entry.getKey().intValue();
            long longValue = entry.getValue().longValue();
            if (intValue >= ceilingKey.intValue()) {
                if (!navigableMap.containsKey(Integer.valueOf(intValue))) {
                    LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), Integer.valueOf(intValue), buildFilteredLeaderEpochMap, navigableMap});
                    return false;
                }
                if (intValue == ceilingKey.intValue() && navigableMap.lowerKey(Integer.valueOf(intValue)) != null && longValue < ((Long) navigableMap.get(Integer.valueOf(intValue))).longValue()) {
                    LOGGER.debug("Segment {} first-valid epoch {} offset is less than first leader epoch offset {}.Remote segment epochs: {} and partition leader epochs: {}", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), Integer.valueOf(intValue), navigableMap.get(Integer.valueOf(intValue)), buildFilteredLeaderEpochMap, navigableMap});
                    return false;
                }
                if (intValue == lastKey.intValue() && (higherEntry = navigableMap.higherEntry(Integer.valueOf(intValue))) != null && endOffset > higherEntry.getValue().longValue() - 1) {
                    LOGGER.debug("Segment {} end offset {} is more than leader epoch offset {}.Remote segment epochs: {} and partition leader epochs: {}", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), Long.valueOf(endOffset), Long.valueOf(higherEntry.getValue().longValue() - 1), buildFilteredLeaderEpochMap, navigableMap});
                    return false;
                }
                if (intValue != lastKey.intValue() && !navigableMap.higherEntry(Integer.valueOf(intValue)).equals(buildFilteredLeaderEpochMap.higherEntry(Integer.valueOf(intValue)))) {
                    LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), Integer.valueOf(intValue), buildFilteredLeaderEpochMap, navigableMap});
                    return false;
                }
            }
        }
        if (endOffset < j) {
            return true;
        }
        LOGGER.debug("Segment {} end offset {} is more than log end offset {}.", new Object[]{remoteLogSegmentMetadata.remoteLogSegmentId(), Long.valueOf(endOffset), Long.valueOf(j)});
        return false;
    }

    static NavigableMap<Integer, Long> buildFilteredLeaderEpochMap(NavigableMap<Integer, Long> navigableMap) {
        ArrayList arrayList = new ArrayList();
        Map.Entry<Integer, Long> entry = null;
        for (Map.Entry<Integer, Long> entry2 : navigableMap.entrySet()) {
            if (entry != null && entry.getValue().equals(entry2.getValue())) {
                arrayList.add(entry.getKey());
            }
            entry = entry2;
        }
        if (arrayList.isEmpty()) {
            return navigableMap;
        }
        TreeMap treeMap = new TreeMap((SortedMap) navigableMap);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            treeMap.remove((Integer) it.next());
        }
        return treeMap;
    }

    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
        Optional<LeaderEpochFileCache> leaderEpochCache;
        int i = remoteStorageFetchInfo.fetchMaxBytes;
        TopicPartition topicPartition = remoteStorageFetchInfo.topicPartition;
        FetchRequest.PartitionData partitionData = remoteStorageFetchInfo.fetchInfo;
        boolean z = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
        long j = partitionData.fetchOffset;
        int min = Math.min(i, partitionData.maxBytes);
        Optional<MergedLog> apply = this.fetchLog.apply(topicPartition);
        OptionalInt empty = OptionalInt.empty();
        if (apply.isPresent() && (leaderEpochCache = apply.get().leaderEpochCache()) != null && leaderEpochCache.isPresent()) {
            empty = leaderEpochCache.get().epochForOffset(j);
        }
        Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata = empty.isPresent() ? fetchRemoteLogSegmentMetadata(topicPartition, empty.getAsInt(), j) : Optional.empty();
        if (!fetchRemoteLogSegmentMetadata.isPresent()) {
            throw new OffsetOutOfRangeException("Received request for offset " + j + " for leader epoch " + (empty.isPresent() ? Integer.toString(empty.getAsInt()) : "NOT AVAILABLE") + " and partition " + topicPartition + " which does not exist in remote tier.");
        }
        RemoteLogSegmentMetadata remoteLogSegmentMetadata = fetchRemoteLogSegmentMetadata.get();
        InputStream inputStream = null;
        int i2 = 0;
        RecordBatch recordBatch = null;
        while (recordBatch == null) {
            try {
                if (!fetchRemoteLogSegmentMetadata.isPresent()) {
                    break;
                }
                remoteLogSegmentMetadata = fetchRemoteLogSegmentMetadata.get();
                i2 = lookupPositionForOffset(remoteLogSegmentMetadata, j);
                inputStream = this.remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, i2);
                recordBatch = findFirstBatch(getRemoteLogInputStream(inputStream), j);
                if (recordBatch == null) {
                    fetchRemoteLogSegmentMetadata = findNextSegmentMetadata(fetchRemoteLogSegmentMetadata.get(), apply.get().leaderEpochCache());
                }
            } finally {
                Utils.closeQuietly(inputStream, "RemoteLogSegmentInputStream");
            }
        }
        if (recordBatch == null) {
            FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(j), MemoryRecords.EMPTY, false, z ? Optional.of(Collections.emptyList()) : Optional.empty());
            Utils.closeQuietly(inputStream, "RemoteLogSegmentInputStream");
            return fetchDataInfo;
        }
        int sizeInBytes = recordBatch.sizeInBytes();
        if (!remoteStorageFetchInfo.minOneMessage && !remoteStorageFetchInfo.hardMaxBytesLimit && sizeInBytes > min) {
            FetchDataInfo fetchDataInfo2 = new FetchDataInfo(new LogOffsetMetadata(j), MemoryRecords.EMPTY);
            Utils.closeQuietly(inputStream, "RemoteLogSegmentInputStream");
            return fetchDataInfo2;
        }
        int i3 = (!remoteStorageFetchInfo.minOneMessage || sizeInBytes <= min) ? min : sizeInBytes;
        ByteBuffer allocate = ByteBuffer.allocate(i3);
        recordBatch.writeTo(allocate);
        if (i3 - sizeInBytes > 0) {
            Utils.readFully(inputStream, allocate, false);
        }
        allocate.flip();
        FetchDataInfo fetchDataInfo3 = new FetchDataInfo(new LogOffsetMetadata(j, remoteLogSegmentMetadata.startOffset(), i2), MemoryRecords.readableRecords(allocate));
        if (z) {
            fetchDataInfo3 = addAbortedTransactions(recordBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo3, apply.get());
        }
        return fetchDataInfo3;
    }

    RemoteLogInputStream getRemoteLogInputStream(InputStream inputStream) {
        return new RemoteLogInputStream(inputStream);
    }

    int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long j) {
        return this.indexCache.lookupOffset(remoteLogSegmentMetadata, j);
    }

    private FetchDataInfo addAbortedTransactions(long j, RemoteLogSegmentMetadata remoteLogSegmentMetadata, FetchDataInfo fetchDataInfo, MergedLog mergedLog) throws RemoteStorageException {
        long longValue = ((Long) this.indexCache.getIndexEntry(remoteLogSegmentMetadata).offsetIndex().fetchUpperBoundOffset(new OffsetPosition(fetchDataInfo.fetchOffsetMetadata.messageOffset, fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment), fetchDataInfo.records.sizeInBytes()).map(offsetPosition -> {
            return Long.valueOf(offsetPosition.offset);
        }).orElse(Long.valueOf(remoteLogSegmentMetadata.endOffset() + 1))).longValue();
        HashSet hashSet = new HashSet();
        collectAbortedTransactions(j, longValue, remoteLogSegmentMetadata, list -> {
            hashSet.addAll((Collection) list.stream().map((v0) -> {
                return v0.asAbortedTransaction();
            }).collect(Collectors.toList()));
        }, mergedLog);
        return new FetchDataInfo(fetchDataInfo.fetchOffsetMetadata, fetchDataInfo.records, fetchDataInfo.firstEntryIncomplete, Optional.of(hashSet.isEmpty() ? Collections.emptyList() : new ArrayList(hashSet)));
    }

    private void collectAbortedTransactions(long j, long j2, RemoteLogSegmentMetadata remoteLogSegmentMetadata, Consumer<List<AbortedTxn>> consumer, MergedLog mergedLog) throws RemoteStorageException {
        Optional<RemoteLogSegmentMetadata> of = Optional.of(remoteLogSegmentMetadata);
        while (true) {
            Optional<RemoteLogSegmentMetadata> optional = of;
            if (!optional.isPresent()) {
                collectAbortedTransactionInLocalSegments(j, j2, consumer, mergedLog.localLogSegments().iterator());
                return;
            }
            Optional<U> map = optional.map(remoteLogSegmentMetadata2 -> {
                return this.indexCache.getIndexEntry(remoteLogSegmentMetadata2).txnIndex();
            });
            if (map.isPresent()) {
                TxnIndexSearchResult collectAbortedTxns = ((TransactionIndex) map.get()).collectAbortedTxns(j, j2);
                consumer.accept(collectAbortedTxns.abortedTransactions);
                if (collectAbortedTxns.isComplete) {
                    return;
                }
            }
            of = findNextSegmentMetadata(optional.get(), mergedLog.leaderEpochCache());
        }
    }

    private void collectAbortedTransactionInLocalSegments(long j, long j2, Consumer<List<AbortedTxn>> consumer, Iterator<LogSegment> it) {
        while (it.hasNext()) {
            TransactionIndex txnIndex = it.next().txnIndex();
            if (txnIndex != null) {
                TxnIndexSearchResult collectAbortedTxns = txnIndex.collectAbortedTxns(j, j2);
                consumer.accept(collectAbortedTxns.abortedTransactions);
                if (collectAbortedTxns.isComplete) {
                    return;
                }
            }
        }
    }

    Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata, Optional<LeaderEpochFileCache> optional) throws RemoteStorageException {
        if (!optional.isPresent()) {
            return Optional.empty();
        }
        long endOffset = remoteLogSegmentMetadata.endOffset() + 1;
        OptionalInt epochForOffset = optional.get().epochForOffset(endOffset);
        return epochForOffset.isPresent() ? fetchRemoteLogSegmentMetadata(remoteLogSegmentMetadata.topicIdPartition().topicPartition(), epochForOffset.getAsInt(), endOffset) : Optional.empty();
    }

    RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long j) throws IOException {
        RecordBatch nextBatch;
        do {
            nextBatch = remoteLogInputStream.nextBatch();
            if (nextBatch == null) {
                break;
            }
        } while (nextBatch.lastOffset() < j);
        return nextBatch;
    }

    OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, MergedLog mergedLog) throws RemoteStorageException {
        OffsetAndEpoch offsetAndEpoch = null;
        Optional<LeaderEpochFileCache> leaderEpochCache = mergedLog.leaderEpochCache();
        if (leaderEpochCache.isPresent()) {
            LeaderEpochFileCache leaderEpochFileCache = leaderEpochCache.get();
            Optional latestEntry = leaderEpochFileCache.latestEntry();
            while (true) {
                Optional optional = latestEntry;
                if (offsetAndEpoch != null || !optional.isPresent()) {
                    break;
                }
                int i = ((EpochEntry) optional.get()).epoch;
                Optional highestOffsetForEpoch = this.remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, i);
                if (highestOffsetForEpoch.isPresent()) {
                    Map.Entry endOffsetFor = leaderEpochFileCache.endOffsetFor(i, mergedLog.logEndOffset());
                    int intValue = ((Integer) endOffsetFor.getKey()).intValue();
                    long longValue = ((Long) endOffsetFor.getValue()).longValue();
                    long longValue2 = ((Long) highestOffsetForEpoch.get()).longValue();
                    if (longValue <= longValue2) {
                        LOGGER.info("The end-offset for epoch {}: ({}, {}) is less than or equal to the highest-remote-offset: {} for partition: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(intValue), Long.valueOf(longValue), Long.valueOf(longValue2), topicIdPartition});
                        offsetAndEpoch = new OffsetAndEpoch(longValue - 1, intValue);
                    } else {
                        offsetAndEpoch = new OffsetAndEpoch(longValue2, i);
                    }
                }
                latestEntry = leaderEpochFileCache.previousEntry(i);
            }
        }
        if (offsetAndEpoch == null) {
            offsetAndEpoch = new OffsetAndEpoch(-1L, -1);
        }
        return offsetAndEpoch;
    }

    long findLogStartOffset(TopicIdPartition topicIdPartition, MergedLog mergedLog) throws RemoteStorageException {
        Optional empty = Optional.empty();
        Optional<LeaderEpochFileCache> leaderEpochCache = mergedLog.leaderEpochCache();
        if (leaderEpochCache.isPresent()) {
            LeaderEpochFileCache leaderEpochFileCache = leaderEpochCache.get();
            OptionalInt optionalInt = (OptionalInt) leaderEpochFileCache.earliestEntry().map(epochEntry -> {
                return OptionalInt.of(epochEntry.epoch);
            }).orElseGet(OptionalInt::empty);
            while (true) {
                OptionalInt optionalInt2 = optionalInt;
                if (empty.isPresent() || !optionalInt2.isPresent()) {
                    break;
                }
                Iterator listRemoteLogSegments = this.remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, optionalInt2.getAsInt());
                if (listRemoteLogSegments.hasNext()) {
                    empty = Optional.of(Long.valueOf(((RemoteLogSegmentMetadata) listRemoteLogSegments.next()).startOffset()));
                }
                optionalInt = leaderEpochFileCache.nextEpoch(optionalInt2.getAsInt());
            }
        }
        mergedLog.getClass();
        return ((Long) empty.orElseGet(mergedLog::localLogStartOffset)).longValue();
    }

    public Future<Void> asyncRead(RemoteStorageFetchInfo remoteStorageFetchInfo, Consumer<RemoteLogReadResult> consumer) {
        return this.remoteStorageReaderThreadPool.submit(new RemoteLogReader(remoteStorageFetchInfo, this, consumer, this.brokerTopicStats, this.rlmFetchQuotaManager, this.remoteReadTimer));
    }

    void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicIdPartition, Consumer<RLMTask> consumer) {
        RemoteLogManagerConfig remoteLogManagerConfig = this.config.remoteLogManagerConfig();
        consumer.accept(this.leaderOrFollowerTasks.computeIfAbsent(topicIdPartition, topicIdPartition2 -> {
            RLMTask rLMTask = new RLMTask(topicIdPartition2, remoteLogManagerConfig.remoteLogMetadataCustomMetadataMaxBytes());
            consumer.accept(rLMTask);
            LOGGER.info("Created a new task: {} and getting scheduled", rLMTask);
            return new RLMTaskWithFuture(rLMTask, this.rlmScheduledThreadPool.scheduleWithFixedDelay(rLMTask, 0L, this.delayInMs, TimeUnit.MILLISECONDS));
        }).rlmTask);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            if (!this.closed) {
                this.leaderOrFollowerTasks.values().forEach((v0) -> {
                    v0.cancel();
                });
                Utils.closeQuietly(this.remoteLogStorageManager, "RemoteLogStorageManager");
                Utils.closeQuietly(this.remoteLogMetadataManager, "RemoteLogMetadataManager");
                Utils.closeQuietly(this.indexCache, "RemoteIndexCache");
                this.rlmScheduledThreadPool.close();
                try {
                    shutdownAndAwaitTermination(this.remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10L, TimeUnit.SECONDS);
                    removeMetrics();
                    this.leaderOrFollowerTasks.clear();
                    this.closed = true;
                } catch (Throwable th) {
                    removeMetrics();
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void shutdownAndAwaitTermination(ExecutorService executorService, String str, long j, TimeUnit timeUnit) {
        LOGGER.info("Shutting down of thread pool {} is started", str);
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(j, timeUnit)) {
                LOGGER.info("Shutting down of thread pool {} could not be completed. It will retry cancelling the tasks using shutdownNow.", str);
                executorService.shutdownNow();
                if (!executorService.awaitTermination(j, timeUnit)) {
                    LOGGER.warn("Shutting down of thread pool {} could not be completed even after retrying cancellation of the tasks using shutdownNow.", str);
                }
            }
        } catch (InterruptedException e) {
            LOGGER.warn("Encountered InterruptedException while shutting down thread pool {}. It will retry cancelling the tasks using shutdownNow.", str);
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
        LOGGER.info("Shutting down of thread pool {} is completed", str);
    }

    static ByteBuffer epochEntriesAsByteBuffer(List<EpochEntry> list) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(byteArrayOutputStream, StandardCharsets.UTF_8));
        Throwable th = null;
        try {
            try {
                new CheckpointFile.CheckpointWriteBuffer(bufferedWriter, 0, LeaderEpochCheckpointFile.FORMATTER).write(list);
                bufferedWriter.flush();
                if (bufferedWriter != null) {
                    if (0 != 0) {
                        try {
                            bufferedWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        bufferedWriter.close();
                    }
                }
                return ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
            } finally {
            }
        } catch (Throwable th3) {
            if (bufferedWriter != null) {
                if (th != null) {
                    try {
                        bufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    bufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) {
        String str = topicIdPartition.topic();
        if (!this.brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) {
            this.brokerTopicStats.removeBrokerLevelRemoteCopyLagBytes(str);
            this.brokerTopicStats.removeBrokerLevelRemoteCopyLagSegments(str);
            this.brokerTopicStats.removeBrokerLevelRemoteDeleteLagBytes(str);
            this.brokerTopicStats.removeBrokerLevelRemoteDeleteLagSegments(str);
            this.brokerTopicStats.removeBrokerLevelRemoteLogMetadataCount(str);
            this.brokerTopicStats.removeBrokerLevelRemoteLogSizeComputationTime(str);
            this.brokerTopicStats.removeBrokerLevelRemoteLogSizeBytes(str);
            return;
        }
        int partition = topicIdPartition.partition();
        this.brokerTopicStats.removeRemoteCopyLagBytes(str, partition);
        this.brokerTopicStats.removeRemoteCopyLagSegments(str, partition);
        this.brokerTopicStats.removeRemoteDeleteLagBytes(str, partition);
        this.brokerTopicStats.removeRemoteDeleteLagSegments(str, partition);
        this.brokerTopicStats.removeRemoteLogMetadataCount(str, partition);
        this.brokerTopicStats.removeRemoteLogSizeComputationTime(str, partition);
        this.brokerTopicStats.removeRemoteLogSizeBytes(str, partition);
    }

    RLMTaskWithFuture task(TopicIdPartition topicIdPartition) {
        return this.leaderOrFollowerTasks.get(topicIdPartition);
    }
}
