package io.confluent.kafkarest.integration;

import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.TestUtils;
import io.confluent.kafkarest.extension.RestResourceExtension;
import io.confluent.kafkarest.testing.KafkaClusterFixture;
import io.confluent.kafkarest.testing.KafkaRestFixture;
import io.confluent.kafkarest.testing.QuorumControllerFixture;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.ws.rs.client.Entity;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.core.Configurable;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.glassfish.hk2.api.Factory;
import org.glassfish.hk2.api.TypeLiteral;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.process.internal.RequestScoped;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("IntegrationTest")
/* loaded from: input_file:io/confluent/kafkarest/integration/ProducerLeakTest.class */
public class ProducerLeakTest {
    private static final String TOPIC_NAME = "topic-1";

    @Order(1)
    @RegisterExtension
    private final QuorumControllerFixture quorumController = QuorumControllerFixture.create();

    @Order(2)
    @RegisterExtension
    private final KafkaClusterFixture kafkaCluster = KafkaClusterFixture.builder().setNumBrokers(3).setQuorumController(this.quorumController).build();

    @Order(3)
    @RegisterExtension
    private final KafkaRestFixture kafkaRest = KafkaRestFixture.builder().setConfig("producer.max.block.ms", "5000").setConfig("kafka.rest.resource.extension.class", LeakyContextExtension.class.getName()).setKafkaCluster(this.kafkaCluster).build();

    /* loaded from: input_file:io/confluent/kafkarest/integration/ProducerLeakTest$LeakyContextExtension.class */
    public static final class LeakyContextExtension implements RestResourceExtension {
        public void register(Configurable<?> configurable, KafkaRestConfig kafkaRestConfig) {
            configurable.register(LeakyFilter.class);
            configurable.register(LeakyModule.class);
        }

        public void clean() {
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/integration/ProducerLeakTest$LeakyFilter.class */
    private static final class LeakyFilter implements ContainerRequestFilter {
        private final Provider<Producer<byte[], byte[]>> producer;

        @Inject
        public LeakyFilter(Provider<Producer<byte[], byte[]>> provider) {
            this.producer = (Provider) Objects.requireNonNull(provider);
        }

        public void filter(ContainerRequestContext containerRequestContext) {
            this.producer.get();
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/integration/ProducerLeakTest$LeakyModule.class */
    private static final class LeakyModule extends AbstractBinder {
        private LeakyModule() {
        }

        protected void configure() {
            bindFactory(LeakyProducerFactory.class).to(new TypeLiteral<Producer<byte[], byte[]>>() { // from class: io.confluent.kafkarest.integration.ProducerLeakTest.LeakyModule.1
            }).in(RequestScoped.class).ranked(1);
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/integration/ProducerLeakTest$LeakyProducerFactory.class */
    private static final class LeakyProducerFactory implements Factory<Producer<byte[], byte[]>> {
        private final KafkaRestConfig config;

        @Inject
        private LeakyProducerFactory(KafkaRestConfig kafkaRestConfig) {
            this.config = (KafkaRestConfig) Objects.requireNonNull(kafkaRestConfig);
        }

        /* renamed from: provide, reason: merged with bridge method [inline-methods] */
        public Producer<byte[], byte[]> m35provide() {
            Map producerConfigs = this.config.getProducerConfigs();
            producerConfigs.put("client.id", "proxy");
            return new KafkaProducer(producerConfigs, new ByteArraySerializer(), new ByteArraySerializer());
        }

        public void dispose(Producer<byte[], byte[]> producer) {
            producer.close();
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.kafkaCluster.createTopic(TOPIC_NAME, 3, (short) 1);
    }

    @ValueSource(strings = {"kraft", "zk"})
    @ParameterizedTest(name = TestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
    public void producerDoesNotLeak(String str) throws Exception {
        this.kafkaRest.target().path("/v3/clusters/" + this.kafkaCluster.getClusterId() + "/topics/" + TOPIC_NAME + "/records").request().accept(new String[]{"application/json"}).post(Entity.json(new byte[1])).readEntity(String.class);
        List list = (List) Thread.getAllStackTraces().keySet().stream().map((v0) -> {
            return v0.getName();
        }).filter(str2 -> {
            return str2.contains("proxy");
        }).collect(Collectors.toList());
        Assertions.assertEquals(0, list.size(), "Expected no live Kafka client threads, but some got left behind instead: " + list);
    }
}
