package io.vertx.kafka.client.producer.impl;

import ch.qos.logback.core.util.FileSize;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.util.backoff.ExponentialBackOff;

/* loaded from: input_file:BOOT-INF/lib/vertx-kafka-client-3.9.8.jar:io/vertx/kafka/client/producer/impl/KafkaWriteStreamImpl.class */
public class KafkaWriteStreamImpl<K, V> implements KafkaWriteStream<K, V> {
    private long maxSize = FileSize.MB_COEFFICIENT;
    private long pending;
    private final Producer<K, V> producer;
    private Handler<Void> drainHandler;
    private Handler<Throwable> exceptionHandler;
    private final Context context;

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Properties properties) {
        return new KafkaWriteStreamImpl<>(vertx.getOrCreateContext(), new KafkaProducer(properties));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        return new KafkaWriteStreamImpl<>(vertx.getOrCreateContext(), new KafkaProducer(properties, (Serializer) serializer, (Serializer) serializer2));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Map<String, Object> map) {
        return new KafkaWriteStreamImpl<>(vertx.getOrCreateContext(), new KafkaProducer(map));
    }

    public static <K, V> KafkaWriteStreamImpl<K, V> create(Vertx vertx, Map<String, Object> map, Serializer<K> serializer, Serializer<V> serializer2) {
        return new KafkaWriteStreamImpl<>(vertx.getOrCreateContext(), new KafkaProducer(map, serializer, serializer2));
    }

    public KafkaWriteStreamImpl(Context context, Producer<K, V> producer) {
        this.producer = producer;
        this.context = context;
    }

    private int len(Object obj) {
        if (obj instanceof byte[]) {
            return ((byte[]) obj).length;
        }
        if (obj instanceof String) {
            return ((String) obj).length();
        }
        return 1;
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public KafkaWriteStream<K, V> send(ProducerRecord<K, V> producerRecord) {
        return send((ProducerRecord) producerRecord, (Handler<AsyncResult<RecordMetadata>>) null);
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public synchronized KafkaWriteStreamImpl<K, V> send(ProducerRecord<K, V> producerRecord, Handler<AsyncResult<RecordMetadata>> handler) {
        int len = len(producerRecord.value());
        this.pending += len;
        this.context.executeBlocking(promise -> {
            try {
                this.producer.send(producerRecord, (recordMetadata, exc) -> {
                    this.context.runOnContext(r11 -> {
                        synchronized (this) {
                            if (exc != null) {
                                if (this.exceptionHandler != null) {
                                    Handler<Throwable> handler2 = this.exceptionHandler;
                                    this.context.runOnContext(r5 -> {
                                        handler2.handle(exc);
                                    });
                                }
                            }
                            long j = this.maxSize / 2;
                            this.pending -= len;
                            if (this.pending < j && this.drainHandler != null) {
                                Handler<Void> handler3 = this.drainHandler;
                                this.drainHandler = null;
                                this.context.runOnContext(handler3);
                            }
                        }
                        if (handler != null) {
                            handler.handle(exc != null ? Future.failedFuture(exc) : Future.succeededFuture(recordMetadata));
                        }
                    });
                });
            } catch (Throwable th) {
                synchronized (this) {
                    if (this.exceptionHandler != null) {
                        Handler<Throwable> handler2 = this.exceptionHandler;
                        this.context.runOnContext(r5 -> {
                            handler2.handle(th);
                        });
                    }
                    if (handler != null) {
                        handler.handle(Future.failedFuture(th));
                    }
                }
            }
        }, null);
        return this;
    }

    public KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> producerRecord, Handler<AsyncResult<Void>> handler) {
        Handler<AsyncResult<RecordMetadata>> handler2 = null;
        if (handler != null) {
            handler2 = asyncResult -> {
                handler.handle(asyncResult.mapEmpty());
            };
        }
        return send((ProducerRecord) producerRecord, handler2);
    }

    @Override // io.vertx.core.streams.WriteStream
    public KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> producerRecord) {
        return write((ProducerRecord) producerRecord, (Handler<AsyncResult<Void>>) null);
    }

    @Override // io.vertx.core.streams.WriteStream
    /* renamed from: setWriteQueueMaxSize */
    public KafkaWriteStreamImpl<K, V> setWriteQueueMaxSize2(int i) {
        this.maxSize = i;
        return this;
    }

    @Override // io.vertx.core.streams.WriteStream
    public synchronized boolean writeQueueFull() {
        return this.pending >= this.maxSize;
    }

    @Override // io.vertx.core.streams.WriteStream
    public synchronized KafkaWriteStreamImpl<K, V> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    @Override // io.vertx.core.streams.WriteStream
    public void end() {
    }

    @Override // io.vertx.core.streams.WriteStream
    public void end(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.context.runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }

    @Override // io.vertx.core.streams.WriteStream, io.vertx.core.streams.StreamBase
    public KafkaWriteStreamImpl<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public KafkaWriteStreamImpl<K, V> partitionsFor(String str, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.context.owner().setTimer(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL, l -> {
            if (atomicBoolean.compareAndSet(false, true)) {
                handler.handle(Future.failedFuture("Kafka connect timeout"));
            }
        });
        this.context.executeBlocking(promise -> {
            List<PartitionInfo> partitionsFor = this.producer.partitionsFor(str);
            if (atomicBoolean.compareAndSet(false, true)) {
                promise.complete(partitionsFor);
            }
        }, handler);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public KafkaWriteStreamImpl<K, V> flush(Handler<Void> handler) {
        this.context.executeBlocking(promise -> {
            this.producer.flush();
            promise.complete();
        }, asyncResult -> {
            handler.handle(null);
        });
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public void close() {
        close(asyncResult -> {
        });
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public void close(Handler<AsyncResult<Void>> handler) {
        close(0L, handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public void close(long j, Handler<AsyncResult<Void>> handler) {
        this.context.executeBlocking(promise -> {
            if (j > 0) {
                this.producer.close(Duration.ofMillis(j));
            } else {
                this.producer.close();
            }
            promise.complete();
        }, handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public Producer<K, V> unwrap() {
        return this.producer;
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public /* bridge */ /* synthetic */ KafkaWriteStream flush(Handler handler) {
        return flush((Handler<Void>) handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public /* bridge */ /* synthetic */ KafkaWriteStream partitionsFor(String str, Handler handler) {
        return partitionsFor(str, (Handler<AsyncResult<List<PartitionInfo>>>) handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaWriteStream
    public /* bridge */ /* synthetic */ KafkaWriteStream send(ProducerRecord producerRecord, Handler handler) {
        return send(producerRecord, (Handler<AsyncResult<RecordMetadata>>) handler);
    }

    @Override // io.vertx.core.streams.WriteStream
    public /* bridge */ /* synthetic */ WriteStream drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.core.streams.WriteStream
    public /* bridge */ /* synthetic */ WriteStream write(Object obj, Handler handler) {
        return write((ProducerRecord) obj, (Handler<AsyncResult<Void>>) handler);
    }

    @Override // io.vertx.core.streams.WriteStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ WriteStream exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.core.streams.WriteStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ StreamBase exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
