package dev.responsive.kafka.api.async.internals;

import dev.responsive.kafka.api.async.internals.contexts.AsyncUserProcessorContext;
import dev.responsive.kafka.api.async.internals.contexts.StreamThreadProcessorContext;
import dev.responsive.kafka.api.async.internals.events.AsyncEvent;
import dev.responsive.kafka.api.async.internals.events.DelayedForward;
import dev.responsive.kafka.api.async.internals.events.DelayedWrite;
import dev.responsive.kafka.api.async.internals.metrics.AsyncProcessorMetricsRecorder;
import dev.responsive.kafka.api.async.internals.queues.FinalizingQueue;
import dev.responsive.kafka.api.async.internals.queues.KeyOrderPreservingQueue;
import dev.responsive.kafka.api.async.internals.queues.MeteredSchedulingQueue;
import dev.responsive.kafka.api.async.internals.queues.SchedulingQueue;
import dev.responsive.kafka.api.async.internals.stores.AbstractAsyncStoreBuilder;
import dev.responsive.kafka.api.async.internals.stores.AsyncKeyValueStore;
import dev.responsive.kafka.api.async.internals.stores.StreamThreadFlushListeners;
import dev.responsive.kafka.api.config.ResponsiveConfig;
import dev.responsive.kafka.internal.config.InternalSessionConfigs;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/AsyncProcessor.class */
public class AsyncProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut>, FixedKeyProcessor<KIn, VIn, VOut> {
    private final Processor<KIn, VIn, KOut, VOut> userProcessor;
    private final FixedKeyProcessor<KIn, VIn, VOut> userFixedKeyProcessor;
    private final Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> connectedStoreBuilders;
    private String logPrefix;
    private Logger log;
    private String streamThreadName;
    private String asyncProcessorName;
    private TaskId taskId;
    private AsyncThreadPool threadPool;
    private SchedulingQueue<KIn> schedulingQueue;
    private FinalizingQueue finalizingQueue;
    private Cancellable punctuator;
    private ProcessingContext taskContext;
    private StreamThreadProcessorContext<KOut, VOut> streamThreadContext;
    private AsyncUserProcessorContext<KOut, VOut> userContext;
    private AsyncProcessorMetricsRecorder metricsRecorder;
    private final Map<AsyncEvent, Object> pendingEvents = new ConcurrentHashMap();
    private FatalAsyncException fatalException = null;
    private boolean hasProcessedSomething = false;

    public static <KIn, VIn, KOut, VOut> AsyncProcessor<KIn, VIn, KOut, VOut> createAsyncProcessor(Processor<KIn, VIn, KOut, VOut> processor, Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> map) {
        return new AsyncProcessor<>(processor, null, map);
    }

    public static <KIn, VIn, VOut> AsyncProcessor<KIn, VIn, KIn, VOut> createAsyncFixedKeyProcessor(FixedKeyProcessor<KIn, VIn, VOut> fixedKeyProcessor, Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> map) {
        return new AsyncProcessor<>(null, fixedKeyProcessor, map);
    }

    private AsyncProcessor(Processor<KIn, VIn, KOut, VOut> processor, FixedKeyProcessor<KIn, VIn, VOut> fixedKeyProcessor, Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> map) {
        this.userProcessor = processor;
        this.userFixedKeyProcessor = fixedKeyProcessor;
        this.connectedStoreBuilders = map;
        if (processor == null && fixedKeyProcessor == null) {
            throw new IllegalStateException("Both the Processor and FixedKeyProcessor were null");
        }
        if (processor != null && fixedKeyProcessor != null) {
            throw new IllegalStateException("Both the Processor and FixedKeyProcessor were non-null");
        }
    }

    public void init(ProcessorContext<KOut, VOut> processorContext) {
        initFields((InternalProcessorContext) processorContext);
        this.userProcessor.init(this.userContext);
        completeInitialization();
    }

    public void init(FixedKeyProcessorContext<KIn, VOut> fixedKeyProcessorContext) {
        initFields((InternalProcessorContext) fixedKeyProcessorContext);
        this.userFixedKeyProcessor.init(this.userContext);
        completeInitialization();
    }

    private void initFields(InternalProcessorContext<KOut, VOut> internalProcessorContext) {
        this.taskContext = internalProcessorContext;
        this.streamThreadName = Thread.currentThread().getName();
        this.taskId = internalProcessorContext.taskId();
        this.asyncProcessorName = internalProcessorContext.currentNode().name();
        this.logPrefix = String.format("stream-thread [%s] %s[%d] ", this.streamThreadName, this.asyncProcessorName, Integer.valueOf(this.taskId.partition()));
        this.log = new LogContext(this.logPrefix).logger(AsyncProcessor.class);
        this.userContext = new AsyncUserProcessorContext<>(this.streamThreadName, this.taskContext, this.logPrefix);
        this.streamThreadContext = new StreamThreadProcessorContext<>(this.logPrefix, internalProcessorContext, this.userContext);
        this.userContext.setDelegateForStreamThread(this.streamThreadContext);
        Map<String, Object> appConfigs = this.userContext.appConfigs();
        ResponsiveMetrics loadMetrics = InternalSessionConfigs.loadMetrics(appConfigs);
        String str = this.streamThreadName;
        TaskId taskId = this.taskId;
        String str2 = this.asyncProcessorName;
        Map<AsyncEvent, Object> map = this.pendingEvents;
        Objects.requireNonNull(map);
        this.metricsRecorder = new AsyncProcessorMetricsRecorder(str, taskId, str2, loadMetrics, map::size);
        ResponsiveConfig responsiveConfig = ResponsiveConfig.responsiveConfig(appConfigs);
        long longValue = responsiveConfig.getLong(ResponsiveConfig.ASYNC_FLUSH_INTERVAL_MS_CONFIG).longValue();
        int intValue = responsiveConfig.getInt(ResponsiveConfig.ASYNC_MAX_EVENTS_QUEUED_PER_KEY_CONFIG).intValue();
        this.threadPool = getAsyncThreadPool(this.taskContext, this.streamThreadName);
        this.threadPool.maybeInitThreadPoolMetrics();
        this.schedulingQueue = new MeteredSchedulingQueue(this.metricsRecorder, new KeyOrderPreservingQueue(this.logPrefix, intValue));
        this.finalizingQueue = new FinalizingQueue(this.logPrefix, this.taskId.partition());
        this.punctuator = this.taskContext.schedule(Duration.ofMillis(longValue), PunctuationType.WALL_CLOCK_TIME, this::punctuate);
    }

    private void completeInitialization() {
        verifyConnectedStateStores(this.streamThreadContext.getAllAsyncStores(), this.connectedStoreBuilders);
        registerFlushListenerForStoreBuilders(this.streamThreadName, this.taskId.partition(), this.connectedStoreBuilders.values(), this::flushAndAwaitPendingEvents);
    }

    void assertQueuesEmptyOnFirstProcess() {
        if (this.hasProcessedSomething) {
            return;
        }
        assertQueuesEmpty();
        this.hasProcessedSomething = true;
    }

    void assertQueuesEmpty() {
        if (!this.schedulingQueue.isEmpty()) {
            this.log.error("the scheduling queue for {} was expected to be empty", this.taskId);
            throw new IllegalStateException("scheduling queue expected to be empty");
        }
        if (!this.threadPool.isEmpty(this.asyncProcessorName, this.taskId.partition())) {
            this.log.error("the thread pool for {} was expected to be empty", this.taskId);
            throw new IllegalStateException("thread pool expected to be empty");
        }
        if (this.finalizingQueue.isEmpty()) {
            return;
        }
        this.log.error("the finalizing queue for {} was expected to be empty", this.taskId);
        throw new IllegalStateException("finalizing queue expected to be empty");
    }

    public void process(Record<KIn, VIn> record) {
        assertQueuesEmptyOnFirstProcess();
        String str = this.logPrefix;
        String str2 = this.asyncProcessorName;
        TaskId taskId = this.taskId;
        ProcessorRecordContext extractRecordContext = extractRecordContext(this.taskContext);
        long currentStreamTimeMs = this.taskContext.currentStreamTimeMs();
        long currentSystemTimeMs = this.taskContext.currentSystemTimeMs();
        Runnable runnable = () -> {
            this.userProcessor.process(record);
        };
        AsyncProcessorMetricsRecorder asyncProcessorMetricsRecorder = this.metricsRecorder;
        Objects.requireNonNull(asyncProcessorMetricsRecorder);
        processNewAsyncEvent(new AsyncEvent(str, (Record<?, ?>) record, str2, taskId, extractRecordContext, currentStreamTimeMs, currentSystemTimeMs, runnable, (List<AsyncEvent.StateTransitionListener>) List.of(asyncProcessorMetricsRecorder::recordStateTransition)));
    }

    public void process(FixedKeyRecord<KIn, VIn> fixedKeyRecord) {
        assertQueuesEmptyOnFirstProcess();
        String str = this.logPrefix;
        String str2 = this.asyncProcessorName;
        TaskId taskId = this.taskId;
        ProcessorRecordContext extractRecordContext = extractRecordContext(this.taskContext);
        long currentStreamTimeMs = this.taskContext.currentStreamTimeMs();
        long currentSystemTimeMs = this.taskContext.currentSystemTimeMs();
        Runnable runnable = () -> {
            this.userFixedKeyProcessor.process(fixedKeyRecord);
        };
        AsyncProcessorMetricsRecorder asyncProcessorMetricsRecorder = this.metricsRecorder;
        Objects.requireNonNull(asyncProcessorMetricsRecorder);
        processNewAsyncEvent(new AsyncEvent(str, (FixedKeyRecord<?, ?>) fixedKeyRecord, str2, taskId, extractRecordContext, currentStreamTimeMs, currentSystemTimeMs, runnable, (List<AsyncEvent.StateTransitionListener>) List.of(asyncProcessorMetricsRecorder::recordStateTransition)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void processNewAsyncEvent(AsyncEvent asyncEvent) {
        if (this.fatalException != null) {
            this.log.error("process called when processor already hit fatal exception", this.fatalException);
            throw new IllegalStateException("process called when already hit exception: " + this.fatalException);
        }
        try {
            this.pendingEvents.put(asyncEvent, new Object());
            maybeBackOffEnqueuingNewEventWithKey(asyncEvent.inputRecordKey());
            this.schedulingQueue.offer(asyncEvent);
            executeAvailableEvents();
        } catch (StreamsException e) {
            if (e instanceof TaskMigratedException) {
                throw e;
            }
            this.fatalException = new FatalAsyncException(e);
            throw this.fatalException;
        } catch (FatalAsyncException e2) {
            this.fatalException = e2;
            throw this.fatalException;
        }
    }

    private ProcessorRecordContext extractRecordContext(ProcessingContext processingContext) {
        return ((InternalProcessorContext) processingContext).recordContext();
    }

    public void close() {
        if (!isCleared()) {
            this.log.warn("Closing async processor with {} in-flight events, this should only happen if the task was shut down dirty and not flushed/committed prior to being closed", Integer.valueOf(this.pendingEvents.size()));
        }
        this.metricsRecorder.close();
        this.punctuator.cancel();
        this.threadPool.removeProcessor(this.asyncProcessorName, this.taskId.partition());
        if (this.userProcessor != null) {
            this.userProcessor.close();
        } else {
            this.userFixedKeyProcessor.close();
        }
    }

    private static void registerFlushListenerForStoreBuilders(String str, int i, Collection<AbstractAsyncStoreBuilder<?, ?, ?>> collection, StreamThreadFlushListeners.AsyncFlushListener asyncFlushListener) {
        Iterator<AbstractAsyncStoreBuilder<?, ?, ?>> it = collection.iterator();
        while (it.hasNext()) {
            it.next().registerFlushListenerWithAsyncStore(str, i, asyncFlushListener);
        }
    }

    public void flushAndAwaitPendingEvents() {
        if (this.fatalException != null) {
            this.log.error("exit flush early due to previous fatal exception", this.fatalException);
            throw this.fatalException;
        }
        try {
            drainFinalizingQueue();
            while (!isCleared()) {
                this.log.trace("Scheduled {} events and finalized {} events", Integer.valueOf(drainSchedulingQueue()), Integer.valueOf(finalizeAtLeastOneEvent()));
            }
            assertQueuesEmpty();
        } catch (StreamsException e) {
            if (e instanceof TaskMigratedException) {
                throw e;
            }
            this.fatalException = new FatalAsyncException(e);
            throw this.fatalException;
        } catch (FatalAsyncException e2) {
            this.fatalException = e2;
            throw this.fatalException;
        }
    }

    private void checkFatalExceptionsFromAsyncThreadPool() {
        Optional<Throwable> checkUncaughtExceptions = this.threadPool.checkUncaughtExceptions(this.asyncProcessorName, this.taskId.partition());
        if (checkUncaughtExceptions.isPresent()) {
            this.log.error("Detected uncaught fatal exception from the async thread pool", checkUncaughtExceptions.get());
            throw new FatalAsyncException("Fatal exception in AsyncThreadPool", checkUncaughtExceptions.get());
        }
    }

    private int finalizeAtLeastOneEvent() {
        checkFatalExceptionsFromAsyncThreadPool();
        int drainFinalizingQueue = drainFinalizingQueue();
        if (drainFinalizingQueue > 0) {
            return drainFinalizingQueue;
        }
        while (true) {
            AsyncEvent waitForNextFinalizableEvent = this.finalizingQueue.waitForNextFinalizableEvent(100L, TimeUnit.MILLISECONDS);
            if (waitForNextFinalizableEvent != null) {
                completePendingEvent(waitForNextFinalizableEvent);
                return 1;
            }
            checkFatalExceptionsFromAsyncThreadPool();
        }
    }

    private void maybeBackOffEnqueuingNewEventWithKey(KIn kin) {
        while (this.schedulingQueue.keyQueueIsFull(kin)) {
            drainSchedulingQueue();
            if (this.schedulingQueue.keyQueueIsFull(kin)) {
                finalizeAtLeastOneEvent();
            }
        }
    }

    private void punctuate(long j) {
        try {
            this.log.debug("Flushing async events during punctuation at {}", Long.valueOf(j));
            executeAvailableEvents();
        } catch (FatalAsyncException e) {
            this.fatalException = e;
            throw this.fatalException;
        } catch (StreamsException e2) {
            if (e2 instanceof TaskMigratedException) {
                throw e2;
            }
            this.fatalException = new FatalAsyncException(e2);
            throw this.fatalException;
        }
    }

    private void executeAvailableEvents() {
        this.log.trace("Finalized {} events", Integer.valueOf(drainFinalizingQueue()));
        this.log.trace("Scheduled {} events", Integer.valueOf(drainSchedulingQueue()));
    }

    private int drainSchedulingQueue() {
        LinkedList linkedList = new LinkedList();
        while (this.schedulingQueue.hasProcessableRecord()) {
            AsyncEvent poll = this.schedulingQueue.poll();
            poll.transitionToToProcess();
            linkedList.add(poll);
        }
        int size = linkedList.size();
        if (size > 0) {
            this.threadPool.scheduleForProcessing(this.asyncProcessorName, this.taskId, linkedList, this.finalizingQueue, this.taskContext, this.userContext, this.metricsRecorder);
        }
        return size;
    }

    private int drainFinalizingQueue() {
        checkFatalExceptionsFromAsyncThreadPool();
        int i = 0;
        while (!this.finalizingQueue.isEmpty()) {
            completePendingEvent(this.finalizingQueue.nextFinalizableEvent());
            i++;
        }
        return i;
    }

    private void completePendingEvent(AsyncEvent asyncEvent) {
        try {
            StreamThreadProcessorContext.PreviousRecordContextAndNode preFinalize = preFinalize(asyncEvent);
            try {
                doFinalize(asyncEvent);
                if (preFinalize != null) {
                    preFinalize.close();
                }
            } finally {
            }
        } finally {
            postFinalize(asyncEvent);
        }
    }

    private StreamThreadProcessorContext.PreviousRecordContextAndNode preFinalize(AsyncEvent asyncEvent) {
        if (!this.pendingEvents.containsKey(asyncEvent)) {
            this.log.error("routed event from {} to the wrong processor for {}", Integer.valueOf(asyncEvent.partition()), this.taskId.toString());
            throw new IllegalStateException(String.format("routed event from %d to the wrong processor for %s", Integer.valueOf(asyncEvent.partition()), this.taskId.toString()));
        }
        Optional<RuntimeException> processingException = asyncEvent.processingException();
        if (processingException.isPresent()) {
            completeAsyncEvent(asyncEvent);
            throw processingException.get();
        }
        StreamThreadProcessorContext.PreviousRecordContextAndNode prepareToFinalizeEvent = this.streamThreadContext.prepareToFinalizeEvent(asyncEvent);
        asyncEvent.transitionToFinalizing();
        return prepareToFinalizeEvent;
    }

    private void doFinalize(AsyncEvent asyncEvent) {
        DelayedWrite nextWrite = asyncEvent.nextWrite();
        DelayedForward<KOut, VOut> nextForward = asyncEvent.nextForward();
        while (true) {
            DelayedForward<KOut, VOut> delayedForward = nextForward;
            if (nextWrite == null && delayedForward == null) {
                return;
            }
            if (nextWrite != null) {
                this.streamThreadContext.executeDelayedWrite(nextWrite);
            }
            if (delayedForward != null) {
                this.streamThreadContext.executeDelayedForward(delayedForward);
            }
            nextWrite = asyncEvent.nextWrite();
            nextForward = asyncEvent.nextForward();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void postFinalize(AsyncEvent asyncEvent) {
        completeAsyncEvent(asyncEvent);
        this.schedulingQueue.unblockKey(asyncEvent.inputRecordKey());
    }

    private void completeAsyncEvent(AsyncEvent asyncEvent) {
        this.pendingEvents.remove(asyncEvent);
        asyncEvent.transitionToDone();
    }

    private boolean isCleared() {
        return this.pendingEvents.isEmpty();
    }

    private void verifyConnectedStateStores(Map<String, AsyncKeyValueStore<?, ?>> map, Map<String, AbstractAsyncStoreBuilder<?, ?, ?>> map2) {
        if (map.keySet().equals(map2.keySet())) {
            return;
        }
        this.log.error("Connected stores names not equal to the stores retrieved via ProcessorContext#getStateStore during initialization. Make sure to pass all state stores used by this processor to the AsyncProcessorSupplier, and they are (all) initialized during the Processor#init call before actual processing begins. Found {} connected store names and {} actual stores used", String.join(",", map2.keySet()), String.join(", ", map.keySet()));
        throw new IllegalStateException("Names of actual stores initialized by this processor does not match the connected store names that were provided to the AsyncProcessorSupplier");
    }

    private static AsyncThreadPool getAsyncThreadPool(ProcessingContext processingContext, String str) {
        try {
            return InternalSessionConfigs.loadAsyncThreadPoolRegistry(processingContext.appConfigsWithPrefix("main.consumer.")).asyncThreadPoolForStreamThread(str);
        } catch (Exception e) {
            throw new ConfigException("Unable to locate async thread pool registry. Make sure to configure responsive.async.thread.pool.size", e);
        }
    }
}
