package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorContextImpl.class */
public class ProcessorContextImpl implements InternalProcessorContext, RecordCollector.Supplier {
    public static final String NONEXIST_TOPIC = "__null_topic__";
    private final TaskId id;
    private final StreamTask task;
    private final StreamsMetrics metrics;
    private final RecordCollector collector;
    private final ProcessorStateManager stateMgr;
    private final StreamsConfig config;
    private final Serde<?> keySerde;
    private final Serde<?> valSerde;
    private final ThreadCache cache;
    private boolean initialized = false;
    private RecordContext recordContext;
    private ProcessorNode currentNode;

    public ProcessorContextImpl(TaskId taskId, StreamTask streamTask, StreamsConfig streamsConfig, RecordCollector recordCollector, ProcessorStateManager processorStateManager, StreamsMetrics streamsMetrics, ThreadCache threadCache) {
        this.id = taskId;
        this.task = streamTask;
        this.metrics = streamsMetrics;
        this.collector = recordCollector;
        this.stateMgr = processorStateManager;
        this.config = streamsConfig;
        this.keySerde = streamsConfig.keySerde();
        this.valSerde = streamsConfig.valueSerde();
        this.cache = threadCache;
    }

    public void initialized() {
        this.initialized = true;
    }

    public ProcessorStateManager getStateMgr() {
        return this.stateMgr;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public TaskId taskId() {
        return this.id;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public String applicationId() {
        return this.task.applicationId();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector.Supplier
    public RecordCollector recordCollector() {
        return this.collector;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public Serde<?> keySerde() {
        return this.keySerde;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public Serde<?> valueSerde() {
        return this.valSerde;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public File stateDir() {
        return this.stateMgr.baseDir();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public StreamsMetrics metrics() {
        return this.metrics;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public void register(StateStore stateStore, boolean z, StateRestoreCallback stateRestoreCallback) {
        if (this.initialized) {
            throw new IllegalStateException("Can only create state stores during initialization.");
        }
        this.stateMgr.register(stateStore, z, stateRestoreCallback);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public StateStore getStateStore(String str) {
        if (this.task.node() == null) {
            throw new TopologyBuilderException("Accessing from an unknown node");
        }
        return this.stateMgr.getStore(str);
    }

    @Override // org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public ThreadCache getCache() {
        return this.cache;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public String topic() {
        if (this.recordContext == null) {
            throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
        }
        String str = this.recordContext.topic();
        if (str.equals(NONEXIST_TOPIC)) {
            return null;
        }
        return str;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public int partition() {
        if (this.recordContext == null) {
            throw new IllegalStateException("This should not happen as partition() should only be called while a record is processed");
        }
        return this.recordContext.partition();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public long offset() {
        if (this.recordContext == null) {
            throw new IllegalStateException("This should not happen as offset() should only be called while a record is processed");
        }
        return this.recordContext.offset();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public long timestamp() {
        if (this.recordContext == null) {
            throw new IllegalStateException("This should not happen as timestamp() should only be called while a record is processed");
        }
        return this.recordContext.timestamp();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v) {
        ProcessorNode processorNode = this.currentNode;
        try {
            for (ProcessorNode<?, ?> processorNode2 : this.currentNode.children()) {
                this.currentNode = processorNode2;
                processorNode2.process(k, v);
            }
        } finally {
            this.currentNode = processorNode;
        }
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v, int i) {
        ProcessorNode processorNode = this.currentNode;
        ProcessorNode<?, ?> processorNode2 = this.currentNode.children().get(i);
        this.currentNode = processorNode2;
        try {
            processorNode2.process(k, v);
            this.currentNode = processorNode;
        } catch (Throwable th) {
            this.currentNode = processorNode;
            throw th;
        }
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v, String str) {
        for (ProcessorNode<?, ?> processorNode : this.currentNode.children()) {
            if (processorNode.name().equals(str)) {
                ProcessorNode processorNode2 = this.currentNode;
                this.currentNode = processorNode;
                try {
                    processorNode.process(k, v);
                    this.currentNode = processorNode2;
                    return;
                } catch (Throwable th) {
                    this.currentNode = processorNode2;
                    throw th;
                }
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public void commit() {
        this.task.needCommit();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public void schedule(long j) {
        this.task.schedule(j);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public Map<String, Object> appConfigs() {
        return this.config.originals();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public Map<String, Object> appConfigsWithPrefix(String str) {
        return this.config.originalsWithPrefix(str);
    }

    @Override // org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public void setRecordContext(RecordContext recordContext) {
        this.recordContext = recordContext;
    }

    @Override // org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public RecordContext recordContext() {
        return this.recordContext;
    }

    @Override // org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public void setCurrentNode(ProcessorNode processorNode) {
        this.currentNode = processorNode;
    }
}
