package dev.responsive.kafka.api.async.internals;

import com.google.common.annotations.VisibleForTesting;
import dev.responsive.kafka.api.async.internals.contexts.AsyncThreadProcessorContext;
import dev.responsive.kafka.api.async.internals.contexts.AsyncUserProcessorContext;
import dev.responsive.kafka.api.async.internals.events.AsyncEvent;
import dev.responsive.kafka.api.async.internals.metrics.AsyncProcessorMetricsRecorder;
import dev.responsive.kafka.api.async.internals.metrics.AsyncThreadPoolMetricsRecorder;
import dev.responsive.kafka.api.async.internals.queues.FinalizingQueue;
import dev.responsive.kafka.internal.metrics.ResponsiveMetrics;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessingContext;
import org.slf4j.Logger;

/* loaded from: input_file:dev/responsive/kafka/api/async/internals/AsyncThreadPool.class */
public class AsyncThreadPool {
    public static final String ASYNC_THREAD_NAME = "AsyncThread";
    private final Logger log;
    private final Supplier<AsyncThreadPoolMetricsRecorder> metricsRecorderSupplier;
    private AsyncThreadPoolMetricsRecorder metricsRecorder;
    private final ThreadPoolExecutor executor;
    private final Semaphore queueSemaphore;
    private final Map<AsyncProcessorId, ConcurrentMap<AsyncEvent, InFlightEvent>> inFlight = new HashMap();
    private final Map<AsyncProcessorId, FatalAsyncException> fatalExceptions = new ConcurrentHashMap();
    private final AtomicInteger threadNameIndex = new AtomicInteger(0);
    private final BlockingQueue<Runnable> processingQueue = new LinkedBlockingQueue();

    /* loaded from: input_file:dev/responsive/kafka/api/async/internals/AsyncThreadPool$AsyncEventTask.class */
    private static class AsyncEventTask<KOut, VOut> implements Supplier<StreamsException> {
        private final AsyncEvent event;
        private final AsyncThreadProcessorContext<KOut, VOut> asyncThreadContext;
        private final AsyncUserProcessorContext<KOut, VOut> wrappingContext;
        private final Semaphore queueSemaphore;
        private final AsyncProcessorMetricsRecorder metricsRecorder;

        private AsyncEventTask(AsyncEvent asyncEvent, ProcessingContext processingContext, AsyncUserProcessorContext<KOut, VOut> asyncUserProcessorContext, Semaphore semaphore, AsyncProcessorMetricsRecorder asyncProcessorMetricsRecorder) {
            this.event = asyncEvent;
            this.wrappingContext = asyncUserProcessorContext;
            this.asyncThreadContext = new AsyncThreadProcessorContext<>(processingContext, asyncEvent);
            this.queueSemaphore = semaphore;
            this.metricsRecorder = asyncProcessorMetricsRecorder;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public StreamsException get() {
            long nanoTime = System.nanoTime();
            this.queueSemaphore.release();
            this.wrappingContext.setDelegateForAsyncThread(this.asyncThreadContext);
            this.event.transitionToProcessing();
            try {
                this.event.inputRecordProcessor().run();
                this.event.transitionToToFinalize();
                this.metricsRecorder.recordEventProcess(System.nanoTime() - nanoTime);
                return null;
            } catch (RuntimeException e) {
                return new StreamsException("Exception caught during async processing", e, this.event.taskId());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dev/responsive/kafka/api/async/internals/AsyncThreadPool$InFlightEvent.class */
    public static class InFlightEvent {
        private final CompletableFuture<StreamsException> future;

        private InFlightEvent(CompletableFuture<StreamsException> completableFuture) {
            this.future = completableFuture;
        }

        CompletableFuture<StreamsException> future() {
            return this.future;
        }
    }

    public AsyncThreadPool(String str, int i, int i2, ResponsiveMetrics responsiveMetrics) {
        this.log = new LogContext(String.format("stream-thread [%s] ", str)).logger(AsyncThreadPool.class);
        this.queueSemaphore = new Semaphore(i2);
        this.executor = new ThreadPoolExecutor(i, i2, Long.MAX_VALUE, TimeUnit.DAYS, this.processingQueue, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName(generateAsyncThreadName(str, this.threadNameIndex.getAndIncrement()));
            return thread;
        });
        this.metricsRecorderSupplier = () -> {
            BlockingQueue<Runnable> blockingQueue = this.processingQueue;
            Objects.requireNonNull(blockingQueue);
            return new AsyncThreadPoolMetricsRecorder(responsiveMetrics, str, blockingQueue::size);
        };
    }

    public boolean isEmpty(String str, TaskId taskId) {
        ConcurrentMap<AsyncEvent, InFlightEvent> concurrentMap = this.inFlight.get(AsyncProcessorId.of(str, taskId));
        if (concurrentMap == null) {
            this.log.debug("No in-flight map found for {}[{}]", str, taskId);
            return true;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("Found in-flight map for {}[{}]: {}", new Object[]{str, taskId, concurrentMap.keySet().stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(", "))});
        }
        return concurrentMap.isEmpty();
    }

    public void removeProcessor(AsyncProcessorId asyncProcessorId) {
        this.log.debug("Removing {}[{}] from async thread pool", asyncProcessorId.processorName, asyncProcessorId.taskId);
        ConcurrentMap<AsyncEvent, InFlightEvent> remove = this.inFlight.remove(asyncProcessorId);
        if (remove != null) {
            this.log.info("Cancelling {} pending records for {}[{}]", new Object[]{Integer.valueOf(remove.size()), asyncProcessorId.processorName, asyncProcessorId.taskId});
            remove.values().forEach(inFlightEvent -> {
                inFlightEvent.future().cancel(true);
            });
        }
    }

    public Optional<Throwable> checkUncaughtExceptions(String str, TaskId taskId) {
        return Optional.ofNullable(this.fatalExceptions.get(AsyncProcessorId.of(str, taskId)));
    }

    @VisibleForTesting
    Map<AsyncEvent, InFlightEvent> getInFlight(String str, TaskId taskId) {
        return this.inFlight.get(AsyncProcessorId.of(str, taskId));
    }

    private static String generateAsyncThreadName(String str, int i) {
        return String.format("%s-%s-%d", str, ASYNC_THREAD_NAME, Integer.valueOf(i));
    }

    public <KOut, VOut> void scheduleForProcessing(String str, TaskId taskId, List<AsyncEvent> list, FinalizingQueue finalizingQueue, ProcessingContext processingContext, AsyncUserProcessorContext<KOut, VOut> asyncUserProcessorContext, AsyncProcessorMetricsRecorder asyncProcessorMetricsRecorder) {
        if (this.metricsRecorder == null) {
            this.log.error("must call maybeInitThreadPoolMetrics before using pool");
            throw new IllegalStateException("must call maybeInitThreadPoolMetrics before using pool");
        }
        AsyncProcessorId of = AsyncProcessorId.of(str, taskId);
        ConcurrentMap<AsyncEvent, InFlightEvent> computeIfAbsent = this.inFlight.computeIfAbsent(of, asyncProcessorId -> {
            return new ConcurrentHashMap();
        });
        for (AsyncEvent asyncEvent : list) {
            try {
                this.queueSemaphore.acquire();
                CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new AsyncEventTask(asyncEvent, processingContext, asyncUserProcessorContext, this.queueSemaphore, asyncProcessorMetricsRecorder), this.executor);
                computeIfAbsent.put(asyncEvent, new InFlightEvent(supplyAsync));
                supplyAsync.whenComplete((streamsException, th) -> {
                    computeIfAbsent.remove(asyncEvent);
                }).whenComplete((streamsException2, th2) -> {
                    if (th2 == null) {
                        if (streamsException2 == null) {
                            finalizingQueue.addFinalizableEvent(asyncEvent);
                        } else {
                            asyncEvent.transitionToFailed(streamsException2);
                            finalizingQueue.addFailedEvent(asyncEvent, streamsException2);
                        }
                    }
                }).exceptionally(th3 -> {
                    if ((th3 instanceof CompletionException) && (th3.getCause() instanceof CancellationException)) {
                        throw ((CompletionException) th3);
                    }
                    this.fatalExceptions.computeIfAbsent(of, asyncProcessorId2 -> {
                        return new FatalAsyncException("Uncaught exception while handling", th3);
                    });
                    if (th3 instanceof RuntimeException) {
                        throw ((RuntimeException) th3);
                    }
                    throw new RuntimeException(th3);
                });
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void maybeInitThreadPoolMetrics() {
        if (this.metricsRecorder == null) {
            this.metricsRecorder = this.metricsRecorderSupplier.get();
        }
    }

    public void shutdown() {
        if (this.metricsRecorder != null) {
            this.metricsRecorder.close();
        }
        this.executor.shutdownNow();
    }
}
