package dev.responsive.kafka.api.async.internals.contexts;

import dev.responsive.kafka.api.async.internals.AsyncUtils;
import dev.responsive.kafka.api.async.internals.events.DelayedWrite;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/contexts/AsyncUserProcessorContext.class */
public class AsyncUserProcessorContext<KOut, VOut> extends DelegatingProcessorContext<KOut, VOut, MergedProcessorContext<KOut, VOut>> implements DelayedAsyncStoreWriter {
    private final Logger log;
    private final String streamThreadName;
    private final ProcessingContext taskContext;
    private final ThreadLocal<MergedProcessorContext<KOut, VOut>> threadLocalDelegate = new ThreadLocal<>();

    public AsyncUserProcessorContext(String str, ProcessingContext processingContext, String str2) {
        this.streamThreadName = str;
        this.taskContext = processingContext;
        this.log = new LogContext(str2).logger(AsyncUserProcessorContext.class);
    }

    public ProcessingContext taskContext() {
        return this.taskContext;
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public MergedProcessorContext<KOut, VOut> delegate() {
        MergedProcessorContext<KOut, VOut> mergedProcessorContext = this.threadLocalDelegate.get();
        if (mergedProcessorContext != null) {
            return mergedProcessorContext;
        }
        String name = Thread.currentThread().getName();
        if (AsyncUtils.isStreamThreadOrAsyncThread(name, this.streamThreadName)) {
            this.log.error("Thread {} attempted to access the context but it was not initialized", name);
            throw new IllegalStateException("Uninitialized thread requested context access");
        }
        this.log.error("Unexpected thread type attempted to access the context. Thread name: {}", name);
        throw new UnsupportedOperationException("Illegal external thread requested context access");
    }

    public void setDelegateForStreamThread(StreamThreadProcessorContext<KOut, VOut> streamThreadProcessorContext) {
        String name = Thread.currentThread().getName();
        this.log.debug("Initializing thread-local context for StreamThread {}", name);
        if (AsyncUtils.isStreamThread(name, this.streamThreadName)) {
            this.threadLocalDelegate.set(streamThreadProcessorContext);
        } else {
            this.log.error("Attempted to set StreamThread context but thread name was {}", name);
            throw new IllegalStateException("Incorrect thread initializing StreamThread context");
        }
    }

    public void setDelegateForAsyncThread(AsyncThreadProcessorContext<KOut, VOut> asyncThreadProcessorContext) {
        String name = Thread.currentThread().getName();
        this.log.trace("Initializing thread-local context for AsyncThread {}", name);
        if (AsyncUtils.isAsyncThread(name, this.streamThreadName)) {
            this.threadLocalDelegate.set(asyncThreadProcessorContext);
        } else {
            this.log.error("Attempted to set AsyncThread context but thread name was {}", name);
            throw new IllegalStateException("Incorrect thread initializing AsyncThread context");
        }
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelayedAsyncStoreWriter
    public void acceptDelayedWriteToAsyncStore(DelayedWrite<?, ?> delayedWrite) {
        String name = Thread.currentThread().getName();
        if (AsyncUtils.isAsyncThread(name, this.streamThreadName)) {
            ((AsyncThreadProcessorContext) this.threadLocalDelegate.get()).currentAsyncEvent().addWrittenRecord(delayedWrite);
        } else {
            this.log.error("A non async thread invoked put on an async store. Caller thread name was {}", name);
            throw new IllegalStateException("Can only call #put on an async state store inside the #process method of an async processor");
        }
    }
}
