/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.sink.writer.topic;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.common.utils.PulsarTransactionUtils;
import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable;
import org.apache.flink.connector.pulsar.sink.config.PulsarSinkConfigUtils;
import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.common.schema.SchemaInfo;

@Internal
public class ProducerRegister
implements Closeable {
    private final PulsarClient pulsarClient;
    private final SinkConfiguration sinkConfiguration;
    @Nullable
    private final CryptoKeyReader cryptoKeyReader;
    private final Map<String, Map<SchemaInfo, Producer<?>>> register;
    private final Map<String, Transaction> transactionRegister;

    public ProducerRegister(SinkConfiguration sinkConfiguration, @Nullable CryptoKeyReader cryptoKeyReader) {
        this.pulsarClient = PulsarClientFactory.createClient(sinkConfiguration);
        this.sinkConfiguration = sinkConfiguration;
        this.cryptoKeyReader = cryptoKeyReader;
        this.register = new HashMap();
        this.transactionRegister = new HashMap<String, Transaction>();
    }

    public <T> TypedMessageBuilder<T> createMessageBuilder(String topic, Schema<T> schema) {
        Producer<T> producer = this.getOrCreateProducer(topic, schema);
        DeliveryGuarantee deliveryGuarantee = this.sinkConfiguration.getDeliveryGuarantee();
        if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            Transaction transaction = this.getOrCreateTransaction(topic);
            return producer.newMessage(transaction);
        }
        return producer.newMessage();
    }

    public List<PulsarCommittable> prepareCommit() {
        ArrayList<PulsarCommittable> committables = new ArrayList<PulsarCommittable>(this.transactionRegister.size());
        this.transactionRegister.forEach((topic, transaction) -> {
            TxnID txnID = transaction.getTxnID();
            PulsarCommittable committable = new PulsarCommittable(txnID, (String)topic);
            committables.add(committable);
        });
        this.clearTransactions();
        return committables;
    }

    public void flush() throws IOException {
        Collection<Map<SchemaInfo, Producer<?>>> collection = this.register.values();
        for (Map<SchemaInfo, Producer<?>> producers : collection) {
            for (Producer<?> producer : producers.values()) {
                producer.flush();
            }
        }
    }

    @Override
    public void close() throws IOException {
        try (Closer closer = Closer.create();){
            closer.register(this::flush);
            closer.register(this::abortTransactions);
            closer.register(this.register::clear);
            closer.register((Closeable)this.pulsarClient);
        }
    }

    private <T> Producer<T> getOrCreateProducer(String topic, Schema<T> schema) {
        SchemaInfo schemaInfo;
        Map producers = this.register.computeIfAbsent(topic, key -> new HashMap());
        if (producers.containsKey(schemaInfo = schema.getSchemaInfo())) {
            return (Producer)producers.get(schemaInfo);
        }
        ProducerBuilder<T> builder = PulsarSinkConfigUtils.createProducerBuilder(this.pulsarClient, schema, this.sinkConfiguration);
        if (this.cryptoKeyReader != null) {
            builder.cryptoKeyReader(this.cryptoKeyReader);
        }
        builder.topic(topic);
        Producer producer = (Producer)PulsarExceptionUtils.sneakyClient(builder::create);
        producers.put(schemaInfo, producer);
        return producer;
    }

    private Transaction getOrCreateTransaction(String topic) {
        return this.transactionRegister.computeIfAbsent(topic, t -> {
            long timeoutMillis = this.sinkConfiguration.getTransactionTimeoutMillis();
            return PulsarTransactionUtils.createTransaction(this.pulsarClient, timeoutMillis);
        });
    }

    private void abortTransactions() {
        if (this.transactionRegister.isEmpty()) {
            return;
        }
        TransactionCoordinatorClientImpl coordinatorClient = ((PulsarClientImpl)this.pulsarClient).getTcClient();
        Preconditions.checkNotNull((Object)coordinatorClient);
        try (Closer closer = Closer.create();){
            for (Transaction transaction : this.transactionRegister.values()) {
                TxnID txnID = transaction.getTxnID();
                closer.register(() -> coordinatorClient.abort(txnID));
            }
            this.clearTransactions();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException((Throwable)e);
        }
    }

    private void clearTransactions() {
        this.transactionRegister.clear();
    }
}

