package dev.responsive.kafka.internal.clients;

import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistration;
import dev.responsive.kafka.api.async.internals.AsyncThreadPoolRegistry;
import dev.responsive.kafka.internal.utils.Utils;
import java.time.Duration;
import org.apache.kafka.clients.consumer.Consumer;

/* loaded from: input_file:dev/responsive/kafka/internal/clients/AsyncStreamsConsumer.class */
public class AsyncStreamsConsumer<K, V> extends DelegatingConsumer<K, V> {
    private final String streamThreadName;
    private final AsyncThreadPoolRegistry asyncThreadPoolRegistry;
    private final AsyncThreadPoolRegistration registration;

    public AsyncStreamsConsumer(Consumer<K, V> consumer, String str, AsyncThreadPoolRegistry asyncThreadPoolRegistry) {
        super(consumer);
        this.streamThreadName = Utils.extractThreadNameFromConsumerClientId(str);
        this.asyncThreadPoolRegistry = asyncThreadPoolRegistry;
        this.registration = asyncThreadPoolRegistry.startNewAsyncThreadPool(this.streamThreadName);
    }

    @Override // dev.responsive.kafka.internal.clients.DelegatingConsumer
    public void close() {
        shutdownAsyncThreadPool();
        super.close();
    }

    @Override // dev.responsive.kafka.internal.clients.DelegatingConsumer
    public void close(Duration duration) {
        shutdownAsyncThreadPool();
        super.close(duration);
    }

    private void shutdownAsyncThreadPool() {
        if (!this.streamThreadName.equals(Thread.currentThread().getName())) {
            throw new IllegalStateException(String.format("Attempted to close consumer for StreamThread %s from thread %s", this.streamThreadName, Thread.currentThread().getName()));
        }
        this.asyncThreadPoolRegistry.shutdownAsyncThreadPool(this.streamThreadName);
    }
}
