package org.apache.flink.connector.pulsar.sink.writer.topic;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.common.metrics.MetricNames;
import org.apache.flink.connector.pulsar.common.metrics.ProducerMetricsInterceptor;
import org.apache.flink.connector.pulsar.common.schema.PulsarSchemaUtils;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.apache.pulsar.shade.com.google.common.io.Closer;

@Internal
/* loaded from: input_file:org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.class */
public class ProducerRegister implements Closeable {
    private static final String FAIL_TO_CREATE_TOPIC = "Fail to create the non-exist topic, make sure you have enable the topic auto creation in Pulsar.";
    private final PulsarClient pulsarClient;

    @Nullable
    private final TransactionCoordinatorClient coordinatorClient;
    private final SinkConfiguration sinkConfiguration;
    private final PulsarCrypto pulsarCrypto;
    private final SinkWriterMetricGroup metricGroup;
    private final Map<String, Schema<byte[]>> schemas = new HashMap();
    private final Map<String, Map<SchemaHash, Producer<?>>> producers = new HashMap();
    private final Map<String, Transaction> transactions = new HashMap();

    public ProducerRegister(SinkConfiguration sinkConfiguration, PulsarCrypto pulsarCrypto, SinkWriterMetricGroup sinkWriterMetricGroup) throws PulsarClientException {
        this.pulsarClient = PulsarClientFactory.createClient(sinkConfiguration);
        this.sinkConfiguration = sinkConfiguration;
        this.pulsarCrypto = pulsarCrypto;
        this.metricGroup = sinkWriterMetricGroup;
        if (sinkConfiguration.isEnableMetrics()) {
            sinkWriterMetricGroup.setCurrentSendTimeGauge(this::currentSendTimeGauge);
        }
        if (sinkConfiguration.getDeliveryGuarantee() == DeliveryGuarantee.EXACTLY_ONCE) {
            this.coordinatorClient = PulsarTransactionUtils.getTcClient(this.pulsarClient);
        } else {
            this.coordinatorClient = null;
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:4:0x0012, code lost:
    
        if (r0 == org.apache.pulsar.common.schema.SchemaType.BYTES) goto L6;
     */
    /* JADX WARN: Multi-variable type inference failed */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public <T> org.apache.pulsar.client.api.TypedMessageBuilder<T> createMessageBuilder(java.lang.String r7, @javax.annotation.Nullable org.apache.pulsar.client.api.Schema<?> r8) throws org.apache.pulsar.client.api.PulsarClientException {
        /*
            r6 = this;
            r0 = r8
            if (r0 == 0) goto L15
            r0 = r8
            org.apache.pulsar.common.schema.SchemaInfo r0 = r0.getSchemaInfo()
            org.apache.pulsar.common.schema.SchemaType r0 = r0.getType()
            org.apache.pulsar.common.schema.SchemaType r1 = org.apache.pulsar.common.schema.SchemaType.BYTES
            if (r0 != r1) goto L1b
        L15:
            r0 = r6
            r1 = r7
            org.apache.pulsar.client.api.Schema r0 = r0.getBytesSchema(r1)
            r8 = r0
        L1b:
            r0 = r6
            r1 = r7
            r2 = r8
            org.apache.pulsar.client.api.Producer r0 = r0.getOrCreateProducer(r1, r2)
            org.apache.pulsar.client.impl.ProducerBase r0 = (org.apache.pulsar.client.impl.ProducerBase) r0
            r9 = r0
            r0 = 0
            r10 = r0
            r0 = r6
            org.apache.flink.connector.pulsar.sink.config.SinkConfiguration r0 = r0.sinkConfiguration
            org.apache.flink.connector.base.DeliveryGuarantee r0 = r0.getDeliveryGuarantee()
            org.apache.flink.connector.base.DeliveryGuarantee r1 = org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE
            if (r0 != r1) goto L3f
            r0 = r6
            r1 = r7
            org.apache.pulsar.client.api.transaction.Transaction r0 = r0.getOrCreateTransaction(r1)
            org.apache.pulsar.client.impl.transaction.TransactionImpl r0 = (org.apache.pulsar.client.impl.transaction.TransactionImpl) r0
            r10 = r0
        L3f:
            org.apache.pulsar.client.impl.TypedMessageBuilderImpl r0 = new org.apache.pulsar.client.impl.TypedMessageBuilderImpl
            r1 = r0
            r2 = r9
            r3 = r8
            r4 = r10
            r1.<init>(r2, r3, r4)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.connector.pulsar.sink.writer.topic.ProducerRegister.createMessageBuilder(java.lang.String, org.apache.pulsar.client.api.Schema):org.apache.pulsar.client.api.TypedMessageBuilder");
    }

    public List<PulsarCommittable> prepareCommit() {
        ArrayList arrayList = new ArrayList(this.transactions.size());
        for (Map.Entry<String, Transaction> entry : this.transactions.entrySet()) {
            arrayList.add(new PulsarCommittable(entry.getValue().getTxnID(), entry.getKey()));
        }
        this.transactions.clear();
        return arrayList;
    }

    public void flush() throws IOException {
        Iterator<Map<SchemaHash, Producer<?>>> it = this.producers.values().iterator();
        while (it.hasNext()) {
            Iterator<Producer<?>> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().flush();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Closer create = Closer.create();
        Throwable th = null;
        try {
            create.register(this::flush);
            create.register(this::abortTransactions);
            Map<String, Map<SchemaHash, Producer<?>>> map = this.producers;
            map.getClass();
            create.register(map::clear);
            create.register(this.pulsarClient);
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private <T> Producer<T> getOrCreateProducer(String str, Schema<T> schema) throws PulsarClientException {
        Map computeIfAbsent = this.producers.computeIfAbsent(str, str2 -> {
            return new HashMap();
        });
        SchemaHash hash = PulsarSchemaUtils.hash(schema);
        if (computeIfAbsent.containsKey(hash)) {
            return (Producer) computeIfAbsent.get(hash);
        }
        try {
            ((PulsarClientImpl) this.pulsarClient).getLookup().getPartitionedTopicMetadata(TopicName.get(str)).get();
            ProducerBuilder<?> createProducerBuilder = PulsarSinkConfigUtils.createProducerBuilder(this.pulsarClient, schema, this.sinkConfiguration);
            configPulsarCrypto(createProducerBuilder);
            createProducerBuilder.topic(str);
            createProducerBuilder.intercept(new ProducerMetricsInterceptor(this.metricGroup));
            Producer<T> create = createProducerBuilder.create();
            exposeProducerMetrics(create);
            computeIfAbsent.put(hash, create);
            return create;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FlinkRuntimeException(FAIL_TO_CREATE_TOPIC, e);
        } catch (ExecutionException e2) {
            throw new FlinkRuntimeException(FAIL_TO_CREATE_TOPIC, e2);
        }
    }

    private void configPulsarCrypto(ProducerBuilder<?> producerBuilder) {
        CryptoKeyReader cryptoKeyReader = this.pulsarCrypto.cryptoKeyReader();
        if (cryptoKeyReader == null) {
            return;
        }
        producerBuilder.cryptoKeyReader(cryptoKeyReader);
        Set<String> encryptKeys = this.pulsarCrypto.encryptKeys();
        if (encryptKeys == null || encryptKeys.isEmpty()) {
            throw new IllegalArgumentException("You should provide encryptKeys in PulsarCrypto");
        }
        producerBuilder.getClass();
        encryptKeys.forEach(producerBuilder::addEncryptionKey);
        MessageCrypto<MessageMetadata, MessageMetadata> messageCrypto = this.pulsarCrypto.messageCrypto();
        if (messageCrypto != null) {
            ((ProducerBuilderImpl) producerBuilder).getConf().setMessageCrypto(messageCrypto);
        }
    }

    private Transaction getOrCreateTransaction(String str) throws PulsarClientException {
        if (this.transactions.containsKey(str)) {
            return this.transactions.get(str);
        }
        Transaction createTransaction = PulsarTransactionUtils.createTransaction(this.pulsarClient, this.sinkConfiguration.getTransactionTimeoutMillis());
        this.transactions.put(str, createTransaction);
        return createTransaction;
    }

    private Schema<byte[]> getBytesSchema(String str) {
        return this.sinkConfiguration.isValidateSinkMessageBytes() ? this.schemas.computeIfAbsent(str, str2 -> {
            return Schema.AUTO_PRODUCE_BYTES();
        }) : Schema.BYTES;
    }

    private void abortTransactions() {
        if (this.coordinatorClient == null || this.transactions.isEmpty()) {
            return;
        }
        try {
            Closer create = Closer.create();
            Throwable th = null;
            try {
                Iterator<Transaction> it = this.transactions.values().iterator();
                while (it.hasNext()) {
                    TxnID txnID = it.next().getTxnID();
                    create.register(() -> {
                        this.coordinatorClient.abort(txnID);
                    });
                }
                this.transactions.clear();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } finally {
            }
        } catch (IOException e) {
            throw new FlinkRuntimeException(e);
        }
    }

    private Long currentSendTimeGauge() {
        return Long.valueOf(Math.round(this.producers.values().stream().flatMap(map -> {
            return map.values().stream();
        }).map((v0) -> {
            return v0.getStats();
        }).mapToDouble((v0) -> {
            return v0.getSendLatencyMillis50pct();
        }).average().orElse(9.223372036854776E18d)));
    }

    private void exposeProducerMetrics(Producer<?> producer) {
        if (this.sinkConfiguration.isEnableMetrics()) {
            String producerName = producer.getProducerName();
            if (Strings.isNullOrEmpty(producerName)) {
                producerName = UUID.randomUUID().toString();
            }
            MetricGroup addGroup = this.metricGroup.addGroup(MetricNames.PULSAR_PRODUCER_METRIC_NAME).addGroup(producer.getTopic()).addGroup(producerName);
            ProducerStats stats = producer.getStats();
            stats.getClass();
            addGroup.gauge(MetricNames.NUM_MSGS_SENT, stats::getNumMsgsSent);
            stats.getClass();
            addGroup.gauge(MetricNames.NUM_BYTES_SENT, stats::getNumBytesSent);
            stats.getClass();
            addGroup.gauge(MetricNames.NUM_SEND_FAILED, stats::getNumSendFailed);
            stats.getClass();
            addGroup.gauge(MetricNames.NUM_ACKS_RECEIVED, stats::getNumAcksReceived);
            stats.getClass();
            addGroup.gauge(MetricNames.SEND_MSGS_RATE, stats::getSendMsgsRate);
            stats.getClass();
            addGroup.gauge(MetricNames.SEND_BYTES_RATE, stats::getSendBytesRate);
            stats.getClass();
            addGroup.gauge(MetricNames.SEND_LATENCY_MILLIS_50_PCT, stats::getSendLatencyMillis50pct);
            stats.getClass();
            addGroup.gauge(MetricNames.SEND_LATENCY_MILLIS_75_PCT, stats::getSendLatencyMillis75pct);
            stats.getClass();
            addGroup.gauge(MetricNames.SEND_LATENCY_MILLIS_95_PCT, stats::getSendLatencyMillis95pct);
            stats.getClass();
            addGroup.gauge(MetricNames.SEND_LATENCY_MILLIS_99_PCT, stats::getSendLatencyMillis99pct);
            stats.getClass();
            addGroup.gauge(MetricNames.SEND_LATENCY_MILLIS_999_PCT, stats::getSendLatencyMillis999pct);
            stats.getClass();
            addGroup.gauge(MetricNames.SEND_LATENCY_MILLIS_MAX, stats::getSendLatencyMillisMax);
            stats.getClass();
            addGroup.gauge(MetricNames.TOTAL_MSGS_SENT, stats::getTotalMsgsSent);
            stats.getClass();
            addGroup.gauge(MetricNames.TOTAL_BYTES_SENT, stats::getTotalBytesSent);
            stats.getClass();
            addGroup.gauge(MetricNames.TOTAL_SEND_FAILED, stats::getTotalSendFailed);
            stats.getClass();
            addGroup.gauge(MetricNames.TOTAL_ACKS_RECEIVED, stats::getTotalAcksReceived);
            stats.getClass();
            addGroup.gauge(MetricNames.PENDING_QUEUE_SIZE, stats::getPendingQueueSize);
        }
    }
}
