package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.2.0.jar:org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.class */
public class GlobalStateUpdateTask implements GlobalStateMaintainer {
    private final ProcessorTopology topology;
    private final InternalProcessorContext processorContext;
    private final Map<TopicPartition, Long> offsets = new HashMap();
    private final Map<String, RecordDeserializer> deserializers = new HashMap();
    private final GlobalStateManager stateMgr;
    private final DeserializationExceptionHandler deserializationExceptionHandler;
    private final LogContext logContext;

    public GlobalStateUpdateTask(ProcessorTopology processorTopology, InternalProcessorContext internalProcessorContext, GlobalStateManager globalStateManager, DeserializationExceptionHandler deserializationExceptionHandler, LogContext logContext) {
        this.topology = processorTopology;
        this.stateMgr = globalStateManager;
        this.processorContext = internalProcessorContext;
        this.deserializationExceptionHandler = deserializationExceptionHandler;
        this.logContext = logContext;
    }

    @Override // org.apache.kafka.streams.processor.internals.GlobalStateMaintainer
    public Map<TopicPartition, Long> initialize() {
        Set<String> initialize = this.stateMgr.initialize();
        Map<String, String> storeToChangelogTopic = this.topology.storeToChangelogTopic();
        Iterator<String> it = initialize.iterator();
        while (it.hasNext()) {
            String str = storeToChangelogTopic.get(it.next());
            this.deserializers.put(str, new RecordDeserializer(this.topology.source(str), this.deserializationExceptionHandler, this.logContext, this.processorContext.metrics().skippedRecordsSensor()));
        }
        initTopology();
        this.processorContext.initialize();
        return this.stateMgr.checkpointed();
    }

    @Override // org.apache.kafka.streams.processor.internals.GlobalStateMaintainer
    public void update(ConsumerRecord<byte[], byte[]> consumerRecord) {
        RecordDeserializer recordDeserializer = this.deserializers.get(consumerRecord.topic());
        ConsumerRecord<Object, Object> deserialize = recordDeserializer.deserialize(this.processorContext, consumerRecord);
        if (deserialize != null) {
            this.processorContext.setRecordContext(new ProcessorRecordContext(deserialize.timestamp(), deserialize.offset(), deserialize.partition(), deserialize.topic(), deserialize.headers()));
            this.processorContext.setCurrentNode(recordDeserializer.sourceNode());
            recordDeserializer.sourceNode().process(deserialize.key(), deserialize.value());
        }
        this.offsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), Long.valueOf(consumerRecord.offset() + 1));
    }

    @Override // org.apache.kafka.streams.processor.internals.GlobalStateMaintainer
    public void flushState() {
        this.stateMgr.flush();
        this.stateMgr.checkpoint(this.offsets);
    }

    @Override // org.apache.kafka.streams.processor.internals.GlobalStateMaintainer
    public void close() throws IOException {
        this.stateMgr.close(true);
    }

    private void initTopology() {
        for (ProcessorNode processorNode : this.topology.processors()) {
            this.processorContext.setCurrentNode(processorNode);
            try {
                processorNode.init(this.processorContext);
            } finally {
                this.processorContext.setCurrentNode(null);
            }
        }
    }
}
