package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.6.0.jar:org/apache/kafka/streams/processor/internals/ProcessorContextImpl.class */
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
    private StreamTask streamTask;
    private RecordCollector collector;
    private final ToInternal toInternal;
    private static final To SEND_TO_ALL = To.all();
    final Map<String, String> storeToChangelogTopic;
    final Map<String, ThreadCache.DirtyEntryFlushListener> cacheNameToFlushListener;

    public ProcessorContextImpl(TaskId taskId, StreamsConfig streamsConfig, ProcessorStateManager processorStateManager, StreamsMetricsImpl streamsMetricsImpl, ThreadCache threadCache) {
        super(taskId, streamsConfig, streamsMetricsImpl, processorStateManager, threadCache);
        this.toInternal = new ToInternal();
        this.storeToChangelogTopic = new HashMap();
        this.cacheNameToFlushListener = new HashMap();
    }

    @Override // org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public void transitionToActive(StreamTask streamTask, RecordCollector recordCollector, ThreadCache threadCache) {
        if (this.stateManager.taskType() != Task.TaskType.ACTIVE) {
            throw new IllegalStateException("Tried to transition processor context to active but the state manager's type was " + this.stateManager.taskType());
        }
        this.streamTask = streamTask;
        this.collector = recordCollector;
        this.cache = threadCache;
        addAllFlushListenersToNewCache();
    }

    @Override // org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public void transitionToStandby(ThreadCache threadCache) {
        if (this.stateManager.taskType() != Task.TaskType.STANDBY) {
            throw new IllegalStateException("Tried to transition processor context to standby but the state manager's type was " + this.stateManager.taskType());
        }
        this.streamTask = null;
        this.collector = null;
        this.cache = threadCache;
        addAllFlushListenersToNewCache();
    }

    @Override // org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public void registerCacheFlushListener(String str, ThreadCache.DirtyEntryFlushListener dirtyEntryFlushListener) {
        this.cacheNameToFlushListener.put(str, dirtyEntryFlushListener);
        this.cache.addDirtyEntryFlushListener(str, dirtyEntryFlushListener);
    }

    private void addAllFlushListenersToNewCache() {
        for (Map.Entry<String, ThreadCache.DirtyEntryFlushListener> entry : this.cacheNameToFlushListener.entrySet()) {
            this.cache.addDirtyEntryFlushListener(entry.getKey(), entry.getValue());
        }
    }

    public ProcessorStateManager stateManager() {
        return (ProcessorStateManager) this.stateManager;
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractProcessorContext, org.apache.kafka.streams.processor.ProcessorContext
    public void register(StateStore stateStore, StateRestoreCallback stateRestoreCallback) {
        this.storeToChangelogTopic.put(stateStore.name(), ProcessorStateManager.storeChangelogTopic(applicationId(), stateStore.name()));
        super.register(stateStore, stateRestoreCallback);
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector.Supplier
    public RecordCollector recordCollector() {
        return this.collector;
    }

    @Override // org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public void logChange(String str, Bytes bytes, byte[] bArr, long j) {
        throwUnsupportedOperationExceptionIfStandby("logChange");
        this.collector.send(this.storeToChangelogTopic.get(str), (String) bytes, (Bytes) bArr, (Headers) null, Integer.valueOf(taskId().partition), Long.valueOf(j), (Serializer<String>) BYTES_KEY_SERIALIZER, (Serializer<Bytes>) BYTEARRAY_VALUE_SERIALIZER);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public StateStore getStateStore(String str) {
        throwUnsupportedOperationExceptionIfStandby("getStateStore");
        if (currentNode() == null) {
            throw new StreamsException("Accessing from an unknown node");
        }
        StateStore globalStore = this.stateManager.getGlobalStore(str);
        if (globalStore != null) {
            return AbstractReadOnlyDecorator.getReadOnlyStore(globalStore);
        }
        if (currentNode().stateStores.contains(str)) {
            return AbstractReadWriteDecorator.getReadWriteStore(this.stateManager.getStore(str));
        }
        throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + str + " as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v) {
        throwUnsupportedOperationExceptionIfStandby("forward");
        forward((ProcessorContextImpl) k, (K) v, SEND_TO_ALL);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    @Deprecated
    public <K, V> void forward(K k, V v, int i) {
        throwUnsupportedOperationExceptionIfStandby("forward");
        forward((ProcessorContextImpl) k, (K) v, To.child(currentNode().children().get(i).name()));
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    @Deprecated
    public <K, V> void forward(K k, V v, String str) {
        throwUnsupportedOperationExceptionIfStandby("forward");
        forward((ProcessorContextImpl) k, (K) v, To.child(str));
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v, To to) {
        throwUnsupportedOperationExceptionIfStandby("forward");
        ProcessorNode<?, ?> currentNode = currentNode();
        ProcessorRecordContext processorRecordContext = this.recordContext;
        try {
            this.toInternal.update(to);
            if (this.toInternal.hasTimestamp()) {
                this.recordContext = new ProcessorRecordContext(this.toInternal.timestamp(), this.recordContext.offset(), this.recordContext.partition(), this.recordContext.topic(), this.recordContext.headers());
            }
            String child = this.toInternal.child();
            if (child == null) {
                Iterator<ProcessorNode<?, ?>> it = currentNode().children().iterator();
                while (it.hasNext()) {
                    forward((ProcessorNode<ProcessorNode<?, ?>, K>) it.next(), (ProcessorNode<?, ?>) k, (K) v);
                }
            } else {
                ProcessorNode<K, V> child2 = currentNode().getChild(child);
                if (child2 == null) {
                    throw new StreamsException("Unknown downstream node: " + child + " either does not exist or is not connected to this processor.");
                }
                forward((ProcessorNode<ProcessorNode<K, V>, K>) child2, (ProcessorNode<K, V>) k, (K) v);
            }
        } finally {
            this.recordContext = processorRecordContext;
            setCurrentNode(currentNode);
        }
    }

    private <K, V> void forward(ProcessorNode<K, V> processorNode, K k, V v) {
        setCurrentNode(processorNode);
        processorNode.process(k, v);
        if (processorNode.isTerminalNode()) {
            this.streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), processorNode.name());
        }
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public void commit() {
        throwUnsupportedOperationExceptionIfStandby("commit");
        this.streamTask.requestCommit();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    @Deprecated
    public Cancellable schedule(long j, PunctuationType punctuationType, Punctuator punctuator) {
        throwUnsupportedOperationExceptionIfStandby("schedule");
        if (j < 1) {
            throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
        }
        return this.streamTask.schedule(j, punctuationType, punctuator);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public Cancellable schedule(Duration duration, PunctuationType punctuationType, Punctuator punctuator) throws IllegalArgumentException {
        throwUnsupportedOperationExceptionIfStandby("schedule");
        return schedule(ApiUtils.validateMillisecondDuration(duration, ApiUtils.prepareMillisCheckFailMsgPrefix(duration, "interval")), punctuationType, punctuator);
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractProcessorContext, org.apache.kafka.streams.processor.ProcessorContext
    public String topic() {
        throwUnsupportedOperationExceptionIfStandby(ConsumerProtocol.TOPIC_KEY_NAME);
        return super.topic();
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractProcessorContext, org.apache.kafka.streams.processor.ProcessorContext
    public int partition() {
        throwUnsupportedOperationExceptionIfStandby("partition");
        return super.partition();
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractProcessorContext, org.apache.kafka.streams.processor.ProcessorContext
    public long offset() {
        throwUnsupportedOperationExceptionIfStandby("offset");
        return super.offset();
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractProcessorContext, org.apache.kafka.streams.processor.ProcessorContext
    public long timestamp() {
        throwUnsupportedOperationExceptionIfStandby("timestamp");
        return super.timestamp();
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractProcessorContext, org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public ProcessorNode<?, ?> currentNode() {
        throwUnsupportedOperationExceptionIfStandby("currentNode");
        return super.currentNode();
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractProcessorContext, org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public void setRecordContext(ProcessorRecordContext processorRecordContext) {
        throwUnsupportedOperationExceptionIfStandby("setRecordContext");
        super.setRecordContext(processorRecordContext);
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractProcessorContext, org.apache.kafka.streams.processor.internals.InternalProcessorContext
    public ProcessorRecordContext recordContext() {
        throwUnsupportedOperationExceptionIfStandby("recordContext");
        return super.recordContext();
    }

    private void throwUnsupportedOperationExceptionIfStandby(String str) {
        if (taskType() == Task.TaskType.STANDBY) {
            throw new UnsupportedOperationException("this should not happen: " + str + "() is not supported in standby tasks.");
        }
    }
}
