package dev.responsive.kafka.api.async.internals.contexts;

import dev.responsive.kafka.api.async.internals.events.AsyncEvent;
import dev.responsive.kafka.api.async.internals.events.DelayedForward;
import java.time.Duration;
import java.util.Optional;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/contexts/AsyncThreadProcessorContext.class */
public class AsyncThreadProcessorContext<KOut, VOut> extends DelegatingProcessorContext<KOut, VOut, InternalProcessorContext<KOut, VOut>> {
    private final AsyncEvent currentAsyncEvent;
    private final InternalProcessorContext<KOut, VOut> taskContext;

    public AsyncThreadProcessorContext(InternalProcessorContext<KOut, VOut> internalProcessorContext, AsyncEvent asyncEvent) {
        this.taskContext = internalProcessorContext;
        this.currentAsyncEvent = asyncEvent;
    }

    public AsyncEvent currentAsyncEvent() {
        return this.currentAsyncEvent;
    }

    private <K extends KOut, V extends VOut> void interceptForward(DelayedForward<K, V> delayedForward) {
        this.currentAsyncEvent.addForwardedRecord(delayedForward);
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public <K extends KOut, V extends VOut> void forward(Record<K, V> record) {
        interceptForward(DelayedForward.ofRecord(record, null));
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public <K extends KOut, V extends VOut> void forward(Record<K, V> record, String str) {
        interceptForward(DelayedForward.ofRecord(record, str));
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public <K extends KOut, V extends VOut> void forward(FixedKeyRecord<K, V> fixedKeyRecord) {
        interceptForward(DelayedForward.ofFixedKeyRecord(fixedKeyRecord, null));
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public <K extends KOut, V extends VOut> void forward(FixedKeyRecord<K, V> fixedKeyRecord, String str) {
        interceptForward(DelayedForward.ofFixedKeyRecord(fixedKeyRecord, str));
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public Optional<RecordMetadata> recordMetadata() {
        return Optional.ofNullable(this.currentAsyncEvent.recordContext());
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public <S extends StateStore> S getStateStore(String str) {
        throw new UnsupportedOperationException("Must call #getStateStore during the Processor's #init method");
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public Cancellable schedule(Duration duration, PunctuationType punctuationType, Punctuator punctuator) {
        throw new UnsupportedOperationException("Please initialize any punctuations during #init");
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public void commit() {
        this.taskContext.commit();
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public long currentSystemTimeMs() {
        return this.currentAsyncEvent.systemTime();
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public long currentStreamTimeMs() {
        return this.currentAsyncEvent.streamTime();
    }

    @Override // dev.responsive.kafka.api.async.internals.contexts.DelegatingProcessorContext
    public InternalProcessorContext<KOut, VOut> delegate() {
        return this.taskContext;
    }
}
