package org.apache.kafka.server.log.remote.metadata.storage;

import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.class */
class ConsumerTask implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ConsumerTask.class);
    private final Consumer<byte[], byte[]> consumer;
    private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
    private final long pollTimeoutMs;
    private final Time time;
    private long uninitializedAt;
    private boolean isAllUserTopicPartitionsInitialized;
    private long lastFailedFetchOffsetsTimestamp;
    private final long offsetFetchRetryIntervalMs;
    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
    private volatile boolean isClosed = false;
    private volatile boolean hasAssignmentChanged = true;
    private final Object assignPartitionsLock = new Object();
    private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet();
    private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap();
    private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
    private final Map<Integer, Long> readOffsetsByMetadataPartition = new ConcurrentHashMap();
    private final Map<TopicIdPartition, Long> readOffsetsByUserTopicPartition = new HashMap();
    private Map<TopicPartition, StartAndEndOffsetHolder> offsetHolderByMetadataPartition = new HashMap();
    private boolean hasLastOffsetsFetchFailed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask$StartAndEndOffsetHolder.class */
    public static class StartAndEndOffsetHolder {
        Long startOffset;
        Long endOffset;

        public StartAndEndOffsetHolder(Long l, Long l2) {
            this.startOffset = l;
            this.endOffset = l2;
        }

        public String toString() {
            return "StartAndEndOffsetHolder{startOffset=" + this.startOffset + ", endOffset=" + this.endOffset + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask$UserTopicIdPartition.class */
    public static class UserTopicIdPartition {
        private final TopicIdPartition topicIdPartition;
        private final Integer metadataPartition;
        boolean isInitialized = false;
        boolean isAssigned = false;

        public UserTopicIdPartition(TopicIdPartition topicIdPartition, Integer num) {
            this.topicIdPartition = (TopicIdPartition) Objects.requireNonNull(topicIdPartition);
            this.metadataPartition = (Integer) Objects.requireNonNull(num);
        }

        public String toString() {
            return "UserTopicIdPartition{topicIdPartition=" + this.topicIdPartition + ", metadataPartition=" + this.metadataPartition + ", isInitialized=" + this.isInitialized + ", isAssigned=" + this.isAssigned + '}';
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            UserTopicIdPartition userTopicIdPartition = (UserTopicIdPartition) obj;
            return this.topicIdPartition.equals(userTopicIdPartition.topicIdPartition) && this.metadataPartition.equals(userTopicIdPartition.metadataPartition);
        }

        public int hashCode() {
            return Objects.hash(this.topicIdPartition, this.metadataPartition);
        }
    }

    public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner, Consumer<byte[], byte[]> consumer, long j, long j2, Time time) {
        this.consumer = consumer;
        this.remotePartitionMetadataEventHandler = (RemotePartitionMetadataEventHandler) Objects.requireNonNull(remotePartitionMetadataEventHandler);
        this.topicPartitioner = (RemoteLogMetadataTopicPartitioner) Objects.requireNonNull(remoteLogMetadataTopicPartitioner);
        this.pollTimeoutMs = j;
        this.offsetFetchRetryIntervalMs = j2;
        this.time = (Time) Objects.requireNonNull(time);
        this.uninitializedAt = time.milliseconds();
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("Starting consumer task thread.");
        while (!this.isClosed) {
            try {
                if (this.hasAssignmentChanged) {
                    maybeWaitForPartitionAssignments();
                }
                log.trace("Polling consumer to receive remote log metadata topic records");
                Iterator<ConsumerRecord<byte[], byte[]>> it = this.consumer.poll(Duration.ofMillis(this.pollTimeoutMs)).iterator();
                while (it.hasNext()) {
                    processConsumerRecord(it.next());
                }
                maybeMarkUserPartitionsAsReady();
            } catch (RetriableException e) {
                log.warn("Retriable error occurred while processing the records. Retrying...", (Throwable) e);
            } catch (WakeupException e2) {
                this.isClosed = true;
            } catch (Exception e3) {
                this.isClosed = true;
                log.error("Error occurred while processing the records", (Throwable) e3);
            }
        }
        try {
            this.consumer.close(Duration.ofSeconds(30L));
        } catch (Exception e4) {
            log.error("Error encountered while closing the consumer", (Throwable) e4);
        }
        log.info("Exited from consumer task thread");
    }

    private void processConsumerRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        RemoteLogMetadata deserialize = this.serde.deserialize(consumerRecord.value());
        if (shouldProcess(deserialize, consumerRecord.offset())) {
            this.remotePartitionMetadataEventHandler.handleRemoteLogMetadata(deserialize);
            this.readOffsetsByUserTopicPartition.put(deserialize.topicIdPartition(), Long.valueOf(consumerRecord.offset()));
        } else {
            log.trace("The event {} is skipped because it is either already processed or not assigned to this consumer", deserialize);
        }
        log.trace("Updating consumed offset: {} for partition {}", Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()));
        this.readOffsetsByMetadataPartition.put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
    }

    private boolean shouldProcess(RemoteLogMetadata remoteLogMetadata, long j) {
        TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition();
        Long l = this.readOffsetsByUserTopicPartition.get(topicIdPartition);
        return this.processedAssignmentOfUserTopicIdPartitions.contains(topicIdPartition) && (l == null || l.longValue() < j);
    }

    private void maybeMarkUserPartitionsAsReady() {
        if (this.isAllUserTopicPartitionsInitialized) {
            return;
        }
        maybeFetchStartAndEndOffsets();
        boolean z = true;
        for (UserTopicIdPartition userTopicIdPartition : this.assignedUserTopicIdPartitions.values()) {
            if (userTopicIdPartition.isAssigned && !userTopicIdPartition.isInitialized) {
                Integer num = userTopicIdPartition.metadataPartition;
                StartAndEndOffsetHolder startAndEndOffsetHolder = this.offsetHolderByMetadataPartition.get(toRemoteLogPartition(num.intValue()));
                if (startAndEndOffsetHolder != null) {
                    Long orDefault = this.readOffsetsByMetadataPartition.getOrDefault(num, -1L);
                    if (orDefault.longValue() + 1 >= startAndEndOffsetHolder.endOffset.longValue() || startAndEndOffsetHolder.endOffset.equals(startAndEndOffsetHolder.startOffset)) {
                        markInitialized(userTopicIdPartition);
                    } else {
                        log.debug("The user-topic-partition {} could not be marked initialized since the read-offset is {} but the end-offset is {} for the metadata-partition {}", userTopicIdPartition, orDefault, startAndEndOffsetHolder.endOffset, num);
                    }
                } else {
                    log.debug("The offset-holder is null for the metadata-partition {}. The consumer may not have picked up the recent assignment", num);
                }
            }
            z = z && userTopicIdPartition.isAssigned && userTopicIdPartition.isInitialized;
        }
        if (z) {
            log.info("Initialized for all the {} assigned user-partitions mapped to the {} meta-partitions in {} ms", Integer.valueOf(this.assignedUserTopicIdPartitions.size()), Integer.valueOf(this.assignedMetadataPartitions.size()), Long.valueOf(this.time.milliseconds() - this.uninitializedAt));
        }
        this.isAllUserTopicPartitionsInitialized = z;
    }

    void maybeWaitForPartitionAssignments() throws InterruptedException {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        synchronized (this.assignPartitionsLock) {
            while (!this.isClosed && this.assignedUserTopicIdPartitions.isEmpty()) {
                log.debug("Waiting for remote log metadata partitions to be assigned");
                this.assignPartitionsLock.wait();
            }
            if (!this.isClosed && this.hasAssignmentChanged) {
                this.assignedUserTopicIdPartitions.values().forEach(userTopicIdPartition -> {
                    hashSet.add(userTopicIdPartition.metadataPartition);
                    hashSet2.add(userTopicIdPartition);
                });
                this.hasAssignmentChanged = false;
            }
        }
        if (hashSet.isEmpty()) {
            return;
        }
        Set<TopicPartition> remoteLogPartitions = toRemoteLogPartitions(hashSet);
        this.consumer.assign(remoteLogPartitions);
        this.assignedMetadataPartitions = Collections.unmodifiableSet(hashSet);
        Stream map = hashSet2.stream().filter(userTopicIdPartition2 -> {
            return !userTopicIdPartition2.isAssigned;
        }).map(userTopicIdPartition3 -> {
            return userTopicIdPartition3.metadataPartition;
        });
        Map<Integer, Long> map2 = this.readOffsetsByMetadataPartition;
        map2.getClass();
        Set set = (Set) map.peek((v1) -> {
            r1.remove(v1);
        }).map((v0) -> {
            return toRemoteLogPartition(v0);
        }).collect(Collectors.toSet());
        this.consumer.seekToBeginning(set);
        remoteLogPartitions.stream().filter(topicPartition -> {
            return !set.contains(topicPartition) && this.readOffsetsByMetadataPartition.containsKey(Integer.valueOf(topicPartition.partition()));
        }).forEach(topicPartition2 -> {
            this.consumer.seek(topicPartition2, this.readOffsetsByMetadataPartition.get(Integer.valueOf(topicPartition2.partition())).longValue());
        });
        HashSet hashSet3 = new HashSet();
        hashSet2.forEach(userTopicIdPartition4 -> {
            if (!userTopicIdPartition4.isAssigned) {
                this.remotePartitionMetadataEventHandler.maybeLoadPartition(userTopicIdPartition4.topicIdPartition);
                userTopicIdPartition4.isAssigned = true;
            }
            hashSet3.add(userTopicIdPartition4.topicIdPartition);
        });
        this.processedAssignmentOfUserTopicIdPartitions = new HashSet(hashSet3);
        clearResourcesForUnassignedUserTopicPartitions(hashSet3);
        this.isAllUserTopicPartitionsInitialized = false;
        this.uninitializedAt = this.time.milliseconds();
        fetchStartAndEndOffsets();
    }

    private void clearResourcesForUnassignedUserTopicPartitions(Set<TopicIdPartition> set) {
        Set set2 = (Set) this.readOffsetsByUserTopicPartition.keySet().stream().filter(topicIdPartition -> {
            return !set.contains(topicIdPartition);
        }).collect(Collectors.toSet());
        set2.forEach(topicIdPartition2 -> {
            this.remotePartitionMetadataEventHandler.clearTopicPartition(topicIdPartition2);
            this.readOffsetsByUserTopicPartition.remove(topicIdPartition2);
        });
        log.info("Unassigned user-topic-partitions: {}", Integer.valueOf(set2.size()));
    }

    public void addAssignmentsForPartitions(Set<TopicIdPartition> set) {
        updateAssignments((Set) Objects.requireNonNull(set), Collections.emptySet());
    }

    public void removeAssignmentsForPartitions(Set<TopicIdPartition> set) {
        updateAssignments(Collections.emptySet(), (Set) Objects.requireNonNull(set));
    }

    private void updateAssignments(Set<TopicIdPartition> set, Set<TopicIdPartition> set2) {
        log.info("Updating assignments for partitions added: {} and removed: {}", set, set2);
        if (set.isEmpty() && set2.isEmpty()) {
            return;
        }
        synchronized (this.assignPartitionsLock) {
            HashMap hashMap = new HashMap(this.assignedUserTopicIdPartitions);
            set.forEach(topicIdPartition -> {
            });
            hashMap.getClass();
            set2.forEach((v1) -> {
                r1.remove(v1);
            });
            if (!hashMap.equals(this.assignedUserTopicIdPartitions)) {
                this.assignedUserTopicIdPartitions = Collections.unmodifiableMap(hashMap);
                this.hasAssignmentChanged = true;
                log.debug("Assigned user-topic-partitions: {}", this.assignedUserTopicIdPartitions);
                this.assignPartitionsLock.notifyAll();
            }
        }
    }

    public Optional<Long> readOffsetForMetadataPartition(int i) {
        return Optional.ofNullable(this.readOffsetsByMetadataPartition.get(Integer.valueOf(i)));
    }

    public boolean isMetadataPartitionAssigned(int i) {
        return this.assignedMetadataPartitions.contains(Integer.valueOf(i));
    }

    public boolean isUserPartitionAssigned(TopicIdPartition topicIdPartition) {
        UserTopicIdPartition userTopicIdPartition = this.assignedUserTopicIdPartitions.get(topicIdPartition);
        return userTopicIdPartition != null && userTopicIdPartition.isAssigned;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isClosed) {
            return;
        }
        log.info("Closing the instance");
        synchronized (this.assignPartitionsLock) {
            this.isClosed = true;
            this.assignedUserTopicIdPartitions.values().forEach(this::markInitialized);
            this.consumer.wakeup();
            this.assignPartitionsLock.notifyAll();
        }
    }

    public Set<Integer> metadataPartitionsAssigned() {
        return Collections.unmodifiableSet(this.assignedMetadataPartitions);
    }

    private void fetchStartAndEndOffsets() {
        try {
            Set set = (Set) this.assignedUserTopicIdPartitions.values().stream().filter(userTopicIdPartition -> {
                return userTopicIdPartition.isAssigned && !userTopicIdPartition.isInitialized;
            }).map(userTopicIdPartition2 -> {
                return toRemoteLogPartition(userTopicIdPartition2.metadataPartition.intValue());
            }).collect(Collectors.toSet());
            set.forEach(topicPartition -> {
                this.offsetHolderByMetadataPartition.remove(topicPartition);
            });
            if (!set.isEmpty()) {
                Map<TopicPartition, Long> endOffsets = this.consumer.endOffsets(set);
                Map<TopicPartition, Long> beginningOffsets = this.consumer.beginningOffsets(set);
                this.offsetHolderByMetadataPartition = (Map) endOffsets.entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return new StartAndEndOffsetHolder((Long) beginningOffsets.get(entry.getKey()), (Long) entry.getValue());
                }));
            }
            this.hasLastOffsetsFetchFailed = false;
        } catch (RetriableException e) {
            this.hasLastOffsetsFetchFailed = true;
            this.lastFailedFetchOffsetsTimestamp = this.time.milliseconds();
        }
    }

    private void maybeFetchStartAndEndOffsets() {
        if (!this.hasLastOffsetsFetchFailed || this.lastFailedFetchOffsetsTimestamp + this.offsetFetchRetryIntervalMs >= this.time.milliseconds()) {
            return;
        }
        fetchStartAndEndOffsets();
    }

    private UserTopicIdPartition newUserTopicIdPartition(TopicIdPartition topicIdPartition) {
        return new UserTopicIdPartition(topicIdPartition, Integer.valueOf(this.topicPartitioner.metadataPartition(topicIdPartition)));
    }

    private void markInitialized(UserTopicIdPartition userTopicIdPartition) {
        if (!userTopicIdPartition.isAssigned) {
            log.warn("Tried to initialize a UTP: {} that was not yet assigned!", userTopicIdPartition);
        } else {
            if (userTopicIdPartition.isInitialized) {
                return;
            }
            this.remotePartitionMetadataEventHandler.markInitialized(userTopicIdPartition.topicIdPartition);
            userTopicIdPartition.isInitialized = true;
        }
    }

    static Set<TopicPartition> toRemoteLogPartitions(Set<Integer> set) {
        return (Set) set.stream().map((v0) -> {
            return toRemoteLogPartition(v0);
        }).collect(Collectors.toSet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TopicPartition toRemoteLogPartition(int i) {
        return new TopicPartition(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME, i);
    }
}
