package io.streamthoughts.jikkou.kafka.internals.producer;

import io.streamthoughts.jikkou.common.memory.OpaqueMemoryResource;
import io.streamthoughts.jikkou.common.memory.ResourceDisposer;
import io.streamthoughts.jikkou.common.memory.SharedResources;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/internals/producer/DefaultProducerFactory.class */
public final class DefaultProducerFactory<K, V> implements ProducerFactory<K, V>, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultProducerFactory.class);
    private final Supplier<Map<String, Object>> configSupplier;
    private Serializer<K> keySerializer;
    private Serializer<V> valueSerializer;
    private Duration closeTimeout = Duration.ofSeconds(30);
    private final SharedResources resources = new SharedResources();
    private final List<Object> leaseHolders = new ArrayList();

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/internals/producer/DefaultProducerFactory$OpaqueProducer.class */
    private static class OpaqueProducer<K, V> implements Producer<K, V> {
        private final OpaqueMemoryResource<Producer<K, V>> delegate;

        public OpaqueProducer(OpaqueMemoryResource<Producer<K, V>> opaqueMemoryResource) {
            this.delegate = opaqueMemoryResource;
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void initTransactions() {
            this.delegate.getResourceHandle().initTransactions();
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void beginTransaction() throws ProducerFencedException {
            this.delegate.getResourceHandle().beginTransaction();
        }

        @Override // org.apache.kafka.clients.producer.Producer
        @Deprecated
        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) throws ProducerFencedException {
            this.delegate.getResourceHandle().sendOffsetsToTransaction(map, str);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
            this.delegate.getResourceHandle().sendOffsetsToTransaction(map, consumerGroupMetadata);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void commitTransaction() throws ProducerFencedException {
            this.delegate.getResourceHandle().commitTransaction();
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void abortTransaction() throws ProducerFencedException {
            this.delegate.getResourceHandle().abortTransaction();
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
            return this.delegate.getResourceHandle().send(producerRecord);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
            return this.delegate.getResourceHandle().send(producerRecord, callback);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void flush() {
            this.delegate.getResourceHandle().flush();
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public List<PartitionInfo> partitionsFor(String str) {
            return this.delegate.getResourceHandle().partitionsFor(str);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.getResourceHandle().metrics();
        }

        @Override // org.apache.kafka.clients.producer.Producer, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            close(null);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void close(Duration duration) {
            try {
                this.delegate.close();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (RuntimeException e2) {
                throw e2;
            } catch (Exception e3) {
                throw new RuntimeException(e3);
            }
        }
    }

    public DefaultProducerFactory(@NotNull Map<String, Object> map) {
        Map unmodifiableMap = Collections.unmodifiableMap(map);
        this.configSupplier = () -> {
            return unmodifiableMap;
        };
    }

    public DefaultProducerFactory(@NotNull Map<String, Object> map, @Nullable Serializer<K> serializer, @Nullable Serializer<V> serializer2) {
        Map unmodifiableMap = Collections.unmodifiableMap(map);
        this.configSupplier = () -> {
            return unmodifiableMap;
        };
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
    }

    public DefaultProducerFactory(@NotNull Supplier<Map<String, Object>> supplier, @Nullable Serializer<K> serializer, @Nullable Serializer<V> serializer2) {
        this.configSupplier = supplier;
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
    }

    public DefaultProducerFactory<K, V> setKeySerializer(Serializer<K> serializer) {
        this.keySerializer = serializer;
        return this;
    }

    public DefaultProducerFactory<K, V> setValueSerializer(Serializer<V> serializer) {
        this.valueSerializer = serializer;
        return this;
    }

    public DefaultProducerFactory<K, V> setCloseTimeout(Duration duration) {
        this.closeTimeout = duration;
        return this;
    }

    @Override // io.streamthoughts.jikkou.kafka.internals.producer.ProducerFactory
    public Producer<K, V> createProducer() {
        Object obj = new Object();
        Producer producer = (Producer) this.resources.getOrCreateSharedResource("kafka-producer", this::createKafkaProducer, obj);
        ResourceDisposer<Exception> createResourceDisposer = createResourceDisposer(obj);
        this.leaseHolders.add(obj);
        return new OpaqueProducer(new OpaqueMemoryResource(producer, createResourceDisposer));
    }

    @NotNull
    private ResourceDisposer<Exception> createResourceDisposer(@NotNull Object obj) {
        return () -> {
            this.resources.release("kafka-producer", obj, this::closeKafkaProducer);
        };
    }

    @NotNull
    private Producer<K, V> createKafkaProducer() {
        LOG.info("Creating new kafka producer instance.");
        return new KafkaProducer(loadConfigs(), this.keySerializer, this.valueSerializer);
    }

    private void closeKafkaProducer(Producer<K, V> producer) {
        producer.close(this.closeTimeout);
    }

    private Map<String, Object> loadConfigs() {
        return this.configSupplier.get();
    }

    @Override // io.streamthoughts.jikkou.kafka.internals.producer.ProducerFactory, java.lang.AutoCloseable
    public void close() {
        ListIterator<Object> listIterator = this.leaseHolders.listIterator();
        while (listIterator.hasNext()) {
            try {
                createResourceDisposer(listIterator.next()).dispose();
            } catch (Throwable th) {
                LOG.error("Error while closing resource", th);
            }
            listIterator.remove();
        }
    }
}
