package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.1.1.jar:org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.class */
public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager {
    private final Logger log;
    private final ProcessorTopology topology;
    private final Consumer<byte[], byte[]> globalConsumer;
    private final StateDirectory stateDirectory;
    private final Set<String> globalStoreNames;
    private final StateRestoreListener stateRestoreListener;
    private InternalProcessorContext processorContext;
    private final int retries;
    private final long retryBackoffMs;
    private final Set<String> globalNonPersistentStoresTopics;

    public GlobalStateManagerImpl(LogContext logContext, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, StateDirectory stateDirectory, StateRestoreListener stateRestoreListener, StreamsConfig streamsConfig) {
        super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
        this.globalStoreNames = new HashSet();
        this.globalNonPersistentStoresTopics = new HashSet();
        Map<String, String> storeToChangelogTopic = processorTopology.storeToChangelogTopic();
        for (StateStore stateStore : processorTopology.globalStateStores()) {
            if (!stateStore.persistent()) {
                this.globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(stateStore.name()));
            }
        }
        this.log = logContext.logger(GlobalStateManagerImpl.class);
        this.topology = processorTopology;
        this.globalConsumer = consumer;
        this.stateDirectory = stateDirectory;
        this.stateRestoreListener = stateRestoreListener;
        this.retries = streamsConfig.getInt("retries").intValue();
        this.retryBackoffMs = streamsConfig.getLong("retry.backoff.ms").longValue();
    }

    @Override // org.apache.kafka.streams.processor.internals.GlobalStateManager
    public void setGlobalProcessorContext(InternalProcessorContext internalProcessorContext) {
        this.processorContext = internalProcessorContext;
    }

    @Override // org.apache.kafka.streams.processor.internals.GlobalStateManager
    public Set<String> initialize() {
        try {
            if (!this.stateDirectory.lockGlobalState()) {
                throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir));
            }
            try {
                this.checkpointableOffsets.putAll(this.checkpoint.read());
                for (StateStore stateStore : this.topology.globalStateStores()) {
                    this.globalStoreNames.add(stateStore.name());
                    stateStore.init(this.processorContext, stateStore);
                }
                return Collections.unmodifiableSet(this.globalStoreNames);
            } catch (IOException e) {
                try {
                    this.stateDirectory.unlockGlobalState();
                } catch (IOException e2) {
                    this.log.error("Failed to unlock the global state directory", (Throwable) e);
                }
                throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
            }
        } catch (IOException e3) {
            throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir));
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void reinitializeStateStoresForPartitions(Collection<TopicPartition> collection, InternalProcessorContext internalProcessorContext) {
        super.reinitializeStateStoresForPartitions(this.log, this.globalStores, this.topology.storeToChangelogTopic(), collection, internalProcessorContext);
        this.globalConsumer.assign(collection);
        this.globalConsumer.seekToBeginning(collection);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getGlobalStore(String str) {
        return this.globalStores.get(str);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getStore(String str) {
        return getGlobalStore(str);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public File baseDir() {
        return this.baseDir;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void register(StateStore stateStore, StateRestoreCallback stateRestoreCallback) {
        if (this.globalStores.containsKey(stateStore.name())) {
            throw new IllegalArgumentException(String.format("Global Store %s has already been registered", stateStore.name()));
        }
        if (!this.globalStoreNames.contains(stateStore.name())) {
            throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", stateStore.name()));
        }
        if (stateRestoreCallback == null) {
            throw new IllegalArgumentException(String.format("The stateRestoreCallback provided for store %s was null", stateStore.name()));
        }
        this.log.info("Restoring state for global store {}", stateStore.name());
        List<TopicPartition> list = topicPartitionsForStore(stateStore);
        Map<TopicPartition, Long> map = null;
        int i = 0;
        while (map == null) {
            try {
                map = this.globalConsumer.endOffsets(list);
            } catch (TimeoutException e) {
                i++;
                if (i > this.retries) {
                    this.log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. You can increase the number of retries via configuration parameter `retries`.", stateStore.name(), Integer.valueOf(this.retries), e);
                    throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. You can increase the number of retries via configuration parameter `retries`.", stateStore.name(), Integer.valueOf(this.retries)), e);
                }
                this.log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})", list, Long.valueOf(this.retryBackoffMs), Integer.valueOf(i), Integer.valueOf(this.retries), e);
                Utils.sleep(this.retryBackoffMs);
            }
        }
        try {
            restoreState(stateRestoreCallback, list, map, stateStore.name());
            this.globalStores.put(stateStore.name(), stateStore);
            this.globalConsumer.unsubscribe();
        } catch (Throwable th) {
            this.globalConsumer.unsubscribe();
            throw th;
        }
    }

    private List<TopicPartition> topicPartitionsForStore(StateStore stateStore) {
        String str = this.topology.storeToChangelogTopic().get(stateStore.name());
        int i = 0;
        while (true) {
            try {
                List<PartitionInfo> partitionsFor = this.globalConsumer.partitionsFor(str);
                if (partitionsFor == null || partitionsFor.isEmpty()) {
                    throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", str, stateStore.name()));
                }
                ArrayList arrayList = new ArrayList();
                for (PartitionInfo partitionInfo : partitionsFor) {
                    arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                }
                return arrayList;
            } catch (TimeoutException e) {
                i++;
                if (i > this.retries) {
                    this.log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. The broker may be transiently unavailable at the moment. You can increase the number of retries via configuration parameter `retries`.", str, Integer.valueOf(this.retries), e);
                    throw new StreamsException(String.format("Failed to get partitions for topic %s after %d retry attempts due to timeout. The broker may be transiently unavailable at the moment. You can increase the number of retries via configuration parameter `retries`.", str, Integer.valueOf(this.retries)), e);
                }
                this.log.debug("Failed to get partitions for topic {} due to timeout. The broker may be transiently unavailable at the moment. Backing off for {} ms to retry (attempt {} of {})", str, Long.valueOf(this.retryBackoffMs), Integer.valueOf(i), Integer.valueOf(this.retries), e);
                Utils.sleep(this.retryBackoffMs);
            }
        }
    }

    private void restoreState(StateRestoreCallback stateRestoreCallback, List<TopicPartition> list, Map<TopicPartition, Long> map, String str) {
        for (TopicPartition topicPartition : list) {
            this.globalConsumer.assign(Collections.singletonList(topicPartition));
            Long l = this.checkpointableOffsets.get(topicPartition);
            if (l != null) {
                this.globalConsumer.seek(topicPartition, l.longValue());
            } else {
                this.globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
            }
            long position = this.globalConsumer.position(topicPartition);
            Long l2 = map.get(topicPartition);
            BatchingStateRestoreCallback batchingStateRestoreCallback = (BatchingStateRestoreCallback) (stateRestoreCallback instanceof BatchingStateRestoreCallback ? stateRestoreCallback : new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
            this.stateRestoreListener.onRestoreStart(topicPartition, str, position, l2.longValue());
            long j = 0;
            while (position < l2.longValue()) {
                try {
                    ConsumerRecords<byte[], byte[]> poll = this.globalConsumer.poll(100L);
                    ArrayList arrayList = new ArrayList();
                    Iterator<ConsumerRecord<byte[], byte[]>> it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord<byte[], byte[]> next = it.next();
                        if (next.key() != null) {
                            arrayList.add(KeyValue.pair(next.key(), next.value()));
                        }
                    }
                    position = this.globalConsumer.position(topicPartition);
                    batchingStateRestoreCallback.restoreAll(arrayList);
                    this.stateRestoreListener.onBatchRestored(topicPartition, str, position, arrayList.size());
                    j += arrayList.size();
                } catch (InvalidOffsetException e) {
                    this.log.warn("Restoring GlobalStore {} failed due to: {}. Deleting global store to recreate from scratch.", str, e.getMessage());
                    reinitializeStateStoresForPartitions(e.partitions(), this.processorContext);
                    this.stateRestoreListener.onRestoreStart(topicPartition, str, position, l2.longValue());
                    j = 0;
                }
            }
            this.stateRestoreListener.onRestoreEnd(topicPartition, str, j);
            this.checkpointableOffsets.put(topicPartition, Long.valueOf(position));
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void flush() {
        this.log.debug("Flushing all global globalStores registered in the state manager");
        for (StateStore stateStore : this.globalStores.values()) {
            try {
                this.log.trace("Flushing global store={}", stateStore.name());
                stateStore.flush();
            } catch (Exception e) {
                throw new ProcessorStateException(String.format("Failed to flush global state store %s", stateStore.name()), e);
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void close(Map<TopicPartition, Long> map) throws IOException {
        try {
            if (this.globalStores.isEmpty()) {
                return;
            }
            StringBuilder sb = new StringBuilder();
            for (Map.Entry<String, StateStore> entry : this.globalStores.entrySet()) {
                this.log.debug("Closing global storage engine {}", entry.getKey());
                try {
                    entry.getValue().close();
                } catch (Exception e) {
                    this.log.error("Failed to close global state store {}", entry.getKey(), e);
                    sb.append("Failed to close global state store:").append(entry.getKey()).append(". Reason: ").append(e.getMessage()).append("\n");
                }
            }
            this.globalStores.clear();
            if (sb.length() > 0) {
                throw new ProcessorStateException("Exceptions caught during close of 1 or more global state globalStores\n" + ((Object) sb));
            }
            checkpoint(map);
            this.stateDirectory.unlockGlobalState();
        } finally {
            this.stateDirectory.unlockGlobalState();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Checkpointable
    public void checkpoint(Map<TopicPartition, Long> map) {
        this.checkpointableOffsets.putAll(map);
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : this.checkpointableOffsets.entrySet()) {
            if (!this.globalNonPersistentStoresTopics.contains(entry.getKey().topic())) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        try {
            this.checkpoint.write(hashMap);
        } catch (IOException e) {
            this.log.warn("Failed to write offset checkpoint file to {} for global stores: {}", this.checkpoint, e);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Checkpointable
    public Map<TopicPartition, Long> checkpointed() {
        return Collections.unmodifiableMap(this.checkpointableOffsets);
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractStateManager
    public /* bridge */ /* synthetic */ void reinitializeStateStoresForPartitions(Logger logger, Map map, Map map2, Collection collection, InternalProcessorContext internalProcessorContext) {
        super.reinitializeStateStoresForPartitions(logger, map, map2, collection, internalProcessorContext);
    }
}
