package dev.responsive.kafka.api.async.internals.contexts;

import dev.responsive.kafka.api.async.internals.events.AsyncEvent;
import dev.responsive.kafka.api.async.internals.events.DelayedForward;
import dev.responsive.kafka.api.async.internals.events.DelayedWrite;
import dev.responsive.kafka.api.async.internals.stores.AsyncKeyValueStore;
import dev.responsive.kafka.api.async.internals.stores.AsyncTimestampedKeyValueStore;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/contexts/StreamThreadProcessorContext.class */
public class StreamThreadProcessorContext<KOut, VOut> extends DelegatingProcessorContext<KOut, VOut, InternalProcessorContext<KOut, VOut>> {
    private final Logger log;
    private final Map<String, AsyncKeyValueStore<?, ?>> storeNameToAsyncStore = new HashMap();
    private final ProcessorNode<?, ?, ?, ?> asyncProcessorNode;
    private final InternalProcessorContext<KOut, VOut> originalContext;
    private final DelayedAsyncStoreWriter delayedStoreWriter;

    /* loaded from: input_file:dev/responsive/kafka/api/async/internals/contexts/StreamThreadProcessorContext$PreviousRecordContextAndNode.class */
    public static class PreviousRecordContextAndNode implements AutoCloseable {
        private final ProcessorRecordContext context;
        private final ProcessorNode<?, ?, ?, ?> node;
        private final InternalProcessorContext<?, ?> previousContext;

        public PreviousRecordContextAndNode(ProcessorRecordContext processorRecordContext, ProcessorNode<?, ?, ?, ?> processorNode, InternalProcessorContext<?, ?> internalProcessorContext) {
            this.context = processorRecordContext;
            this.node = processorNode;
            this.previousContext = internalProcessorContext;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.previousContext.setRecordContext(this.context);
            this.previousContext.setCurrentNode(this.node);
        }
    }

    public StreamThreadProcessorContext(String str, InternalProcessorContext<KOut, VOut> internalProcessorContext, DelayedAsyncStoreWriter delayedAsyncStoreWriter) {
        this.log = new LogContext((String) Objects.requireNonNull(str)).logger(StreamThreadProcessorContext.class);
        this.originalContext = (InternalProcessorContext) Objects.requireNonNull(internalProcessorContext);
        this.asyncProcessorNode = internalProcessorContext.currentNode();
        this.delayedStoreWriter = (DelayedAsyncStoreWriter) Objects.requireNonNull(delayedAsyncStoreWriter);
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public <S extends StateStore> S getStateStore(String str) {
        if (this.storeNameToAsyncStore.containsKey(str)) {
            return this.storeNameToAsyncStore.get(str);
        }
        StateStore stateStore = super.getStateStore(str);
        if (stateStore instanceof TimestampedKeyValueStore) {
            AsyncTimestampedKeyValueStore asyncTimestampedKeyValueStore = new AsyncTimestampedKeyValueStore(str, taskId().partition(), (KeyValueStore) stateStore, this.delayedStoreWriter);
            this.storeNameToAsyncStore.put(str, asyncTimestampedKeyValueStore);
            return asyncTimestampedKeyValueStore;
        }
        if (!(stateStore instanceof KeyValueStore)) {
            this.log.error("Attempted to connect window/session store with async processor");
            throw new UnsupportedOperationException("Window and Session stores are not yet supported with async processing");
        }
        AsyncKeyValueStore<?, ?> asyncKeyValueStore = new AsyncKeyValueStore<>(str, this.originalContext.partition(), (KeyValueStore) stateStore, this.delayedStoreWriter);
        this.storeNameToAsyncStore.put(str, asyncKeyValueStore);
        return asyncKeyValueStore;
    }

    public PreviousRecordContextAndNode prepareToFinalizeEvent(AsyncEvent asyncEvent) {
        if (!asyncEvent.currentState().equals(AsyncEvent.State.TO_FINALIZE)) {
            this.log.error("Attempted to prepare event for finalization but currentState was {}", asyncEvent.currentState());
            throw new IllegalStateException("Must prepare event for finalization while it's in the TO_FINALIZE state");
        }
        ProcessorRecordContext recordContext = asyncEvent.recordContext();
        PreviousRecordContextAndNode previousRecordContextAndNode = new PreviousRecordContextAndNode(this.originalContext.recordContext(), this.originalContext.currentNode(), this.originalContext);
        this.originalContext.setRecordContext(recordContext);
        this.originalContext.setCurrentNode(this.asyncProcessorNode);
        return previousRecordContextAndNode;
    }

    public <KS, VS> void executeDelayedWrite(DelayedWrite<KS, VS> delayedWrite) {
        getAsyncStore(delayedWrite.storeName()).executeDelayedWrite(delayedWrite);
    }

    public <K extends KOut, V extends VOut> void executeDelayedForward(DelayedForward<K, V> delayedForward) {
        if (delayedForward.isFixedKey()) {
            super.forward(delayedForward.fixedKeyRecord(), delayedForward.childName());
        } else {
            super.forward(delayedForward.record(), delayedForward.childName());
        }
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    /* renamed from: delegate, reason: merged with bridge method [inline-methods] */
    public InternalProcessorContext<KOut, VOut> mo7delegate() {
        return this.originalContext;
    }

    public <KS, VS> AsyncKeyValueStore<KS, VS> getAsyncStore(String str) {
        return (AsyncKeyValueStore) this.storeNameToAsyncStore.get(str);
    }

    public Map<String, AsyncKeyValueStore<?, ?>> getAllAsyncStores() {
        return this.storeNameToAsyncStore;
    }
}
