/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar;

import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.serialization.PulsarSerializationSchema;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkPulsarSink<T>
extends FlinkPulsarSinkBase<T> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSink.class);
    private final PulsarSerializationSchema<T> serializationSchema;

    public FlinkPulsarSink(String adminUrl, Optional<String> defaultTopicName, ClientConfigurationData clientConf, Properties properties, PulsarSerializationSchema serializationSchema, MessageRouter messageRouter, PulsarSinkSemantic semantic) {
        super(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, semantic);
        this.serializationSchema = serializationSchema;
    }

    public FlinkPulsarSink(String adminUrl, Optional<String> defaultTopicName, ClientConfigurationData clientConf, Properties properties, PulsarSerializationSchema serializationSchema, PulsarSinkSemantic semantic) {
        this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, (MessageRouter)null, semantic);
    }

    public FlinkPulsarSink(String adminUrl, Optional<String> defaultTopicName, ClientConfigurationData clientConf, Properties properties, PulsarSerializationSchema serializationSchema) {
        this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, PulsarSinkSemantic.AT_LEAST_ONCE);
    }

    public FlinkPulsarSink(String serviceUrl, String adminUrl, Optional<String> defaultTopicName, Properties properties, PulsarSerializationSchema<T> serializationSchema) {
        this(adminUrl, defaultTopicName, PulsarClientUtils.newClientConf((String)Preconditions.checkNotNull((Object)serviceUrl), properties), properties, serializationSchema, PulsarSinkSemantic.AT_LEAST_ONCE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void invoke(FlinkPulsarSinkBase.PulsarTransactionState<T> transactionState, T value, SinkFunction.Context context) throws Exception {
        this.checkErroneous();
        this.initializeSendCallback();
        Optional<String> targetTopic = this.serializationSchema.getTargetTopic(value);
        String topic = targetTopic.orElse(this.defaultTopic);
        TypedMessageBuilder mb = transactionState.isTransactional() ? this.getProducer(topic).newMessage(transactionState.getTransaction()) : this.getProducer(topic).newMessage();
        this.serializationSchema.serialize(value, mb);
        if (this.flushOnCheckpoint) {
            SerializableObject serializableObject = this.pendingRecordsLock;
            synchronized (serializableObject) {
                ++this.pendingRecords;
            }
        }
        CompletableFuture messageIdFuture = mb.sendAsync();
        if (transactionState.isTransactional()) {
            Thread.sleep(10L);
            TxnID transactionalId = transactionState.transactionalId;
            this.tid2FuturesMap.computeIfAbsent(transactionalId, key -> new ArrayList()).add(messageIdFuture);
            log.debug("message {} is invoke in txn {}", value, (Object)transactionState.transactionalId);
        }
        messageIdFuture.whenComplete(this.sendCallback);
    }
}

