/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.pulsar;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class FlinkPulsarSinkBase<T>
extends TwoPhaseCommitSinkFunction<T, PulsarTransactionState<T>, Void>
implements CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSinkBase.class);
    protected String adminUrl;
    protected ClientConfigurationData clientConfigurationData;
    protected final Map<String, String> caseInsensitiveParams;
    protected final Map<String, Object> producerConf;
    protected final Properties properties;
    protected boolean flushOnCheckpoint;
    protected boolean failOnWrite;
    protected long transactionTimeout;
    protected long maxBlockTimeMs;
    protected int sendTimeOutMs;
    protected final SerializableObject pendingRecordsLock = new SerializableObject();
    protected long pendingRecords = 0L;
    protected ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
    protected ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> tid2FuturesMap;
    protected final boolean forcedTopic;
    protected final String defaultTopic;
    protected final PulsarSerializationSchema<T> serializationSchema;
    protected final MessageRouter messageRouter;
    protected volatile transient Throwable failedWrite;
    protected transient PulsarAdmin admin;
    protected transient BiConsumer<MessageId, Throwable> sendCallback;
    protected PulsarSinkSemantic semantic;
    protected transient Producer<T> singleProducer;
    protected transient Map<String, Producer<T>> topic2Producer;

    public FlinkPulsarSinkBase(String adminUrl, Optional<String> defaultTopicName, ClientConfigurationData clientConf, Properties properties, PulsarSerializationSchema<T> serializationSchema, MessageRouter messageRouter) {
        this(adminUrl, defaultTopicName, clientConf, properties, serializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE);
    }

    public FlinkPulsarSinkBase(String adminUrl, Optional<String> defaultTopicName, ClientConfigurationData clientConf, Properties properties, PulsarSerializationSchema<T> serializationSchema, MessageRouter messageRouter, PulsarSinkSemantic semantic) {
        super(new TransactionStateSerializer(), (TypeSerializer)VoidSerializer.INSTANCE);
        this.adminUrl = (String)Preconditions.checkNotNull((Object)adminUrl);
        this.semantic = semantic;
        if (defaultTopicName.isPresent()) {
            this.forcedTopic = true;
            this.defaultTopic = defaultTopicName.get();
        } else {
            this.forcedTopic = false;
            this.defaultTopic = null;
        }
        this.serializationSchema = serializationSchema;
        this.messageRouter = messageRouter;
        this.clientConfigurationData = clientConf;
        this.properties = (Properties)Preconditions.checkNotNull((Object)properties);
        this.caseInsensitiveParams = SourceSinkUtils.toCaceInsensitiveParams((Map<String, String>)Maps.fromProperties((Properties)properties));
        this.producerConf = SourceSinkUtils.getProducerParams((Map<String, String>)Maps.fromProperties((Properties)properties));
        this.flushOnCheckpoint = SourceSinkUtils.flushOnCheckpoint(this.caseInsensitiveParams);
        this.failOnWrite = SourceSinkUtils.failOnWrite(this.caseInsensitiveParams);
        this.transactionTimeout = SourceSinkUtils.getTransactionTimeout(this.caseInsensitiveParams);
        this.maxBlockTimeMs = SourceSinkUtils.getMaxBlockTimeMs(this.caseInsensitiveParams);
        this.sendTimeOutMs = SourceSinkUtils.getSendTimeoutMs(this.caseInsensitiveParams);
        CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(this.caseInsensitiveParams));
        if (semantic == PulsarSinkSemantic.EXACTLY_ONCE) {
            this.sendTimeOutMs = 0;
            this.tid2MessagesMap = new ConcurrentHashMap();
            this.tid2FuturesMap = new ConcurrentHashMap();
            this.clientConfigurationData.setEnableTransaction(true);
        }
        if (this.clientConfigurationData.getServiceUrl() == null) {
            throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
        }
    }

    public FlinkPulsarSinkBase(String serviceUrl, String adminUrl, Optional<String> defaultTopicName, Properties properties, PulsarSerializationSchema serializationSchema, MessageRouter messageRouter) {
        this(adminUrl, defaultTopicName, PulsarClientUtils.newClientConf((String)Preconditions.checkNotNull((Object)serviceUrl), properties), properties, serializationSchema, messageRouter);
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.checkErroneous();
        super.snapshotState(context);
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        if (this.semantic != PulsarSinkSemantic.NONE && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
            log.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", (Object)this.semantic, (Object)PulsarSinkSemantic.NONE);
            this.semantic = PulsarSinkSemantic.NONE;
        }
        super.initializeState(context);
    }

    public void open(Configuration parameters) throws Exception {
        if (this.flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
            log.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            this.flushOnCheckpoint = false;
        }
        this.admin = PulsarClientUtils.newAdminFromConf(this.adminUrl, this.clientConfigurationData);
        this.serializationSchema.open(RuntimeContextInitializationContextAdapters.serializationAdapter((RuntimeContext)this.getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
        if (this.forcedTopic) {
            this.uploadSchema(this.defaultTopic);
            this.singleProducer = this.createProducer(this.clientConfigurationData, this.producerConf, this.defaultTopic, this.serializationSchema.getSchema());
        } else {
            this.topic2Producer = new HashMap<String, Producer<T>>();
        }
    }

    protected void initializeSendCallback() {
        if (this.sendCallback != null) {
            return;
        }
        this.sendCallback = this.failOnWrite ? (t, u) -> {
            if (this.failedWrite == null && u == null) {
                this.acknowledgeMessage();
            } else if (this.failedWrite == null && u != null) {
                this.failedWrite = u;
            } else {
                log.warn("callback error {}", u);
            }
        } : (t, u) -> {
            if (this.failedWrite == null && u != null) {
                log.error("Error while sending message to Pulsar: {}", (Object)ExceptionUtils.stringifyException((Throwable)u));
            }
            this.acknowledgeMessage();
        };
    }

    private void uploadSchema(String topic) {
        SchemaUtils.uploadPulsarSchema(this.admin, topic, this.serializationSchema.getSchema().getSchemaInfo());
    }

    public void close() throws Exception {
        this.checkErroneous();
        this.producerClose();
        this.checkErroneous();
    }

    protected Producer<T> getProducer(String topic) {
        if (this.forcedTopic) {
            return this.singleProducer;
        }
        if (this.topic2Producer.containsKey(topic)) {
            return this.topic2Producer.get(topic);
        }
        this.uploadSchema(topic);
        Producer p = this.createProducer(this.clientConfigurationData, this.producerConf, topic, this.serializationSchema.getSchema());
        this.topic2Producer.put(topic, p);
        return p;
    }

    protected Producer<T> createProducer(ClientConfigurationData clientConf, Map<String, Object> producerConf, String topic, Schema<T> schema) {
        try {
            ProducerBuilder builder = CachedPulsarClient.getOrCreate(clientConf).newProducer(schema).topic(topic).sendTimeout(this.sendTimeOutMs, TimeUnit.MILLISECONDS).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxBytes(0x500000).loadConf(producerConf);
            if (this.messageRouter == null) {
                return builder.create();
            }
            return builder.messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(this.messageRouter).create();
        }
        catch (PulsarClientException e) {
            log.error("Failed to create producer for topic {}", (Object)topic);
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void producerFlush(PulsarTransactionState<T> transaction) throws Exception {
        if (this.singleProducer != null) {
            this.singleProducer.flush();
        } else if (this.topic2Producer != null) {
            for (Producer producer : this.topic2Producer.values()) {
                producer.flush();
            }
        }
        if (transaction.isTransactional()) {
            List<CompletableFuture<MessageId>> futureList = this.tid2FuturesMap.get(transaction.transactionalId);
            for (CompletableFuture<MessageId> completableFuture : futureList) {
                try {
                    MessageId messageId = completableFuture.get();
                    TxnID transactionalId = transaction.transactionalId;
                    this.tid2MessagesMap.computeIfAbsent(transactionalId, key -> new ArrayList()).add(messageId);
                    log.debug("transaction {} add the message {} to messageIdLIst", (Object)transactionalId, (Object)messageId);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        SerializableObject serializableObject = this.pendingRecordsLock;
        synchronized (serializableObject) {
            while (this.pendingRecords > 0L) {
                try {
                    this.pendingRecordsLock.wait();
                }
                catch (InterruptedException interruptedException) {
                    throw new RuntimeException("Flushing got interrupted while checkpointing", interruptedException);
                }
            }
        }
        this.checkErroneous();
    }

    private Transaction createTransaction() throws Exception {
        PulsarClientImpl client = CachedPulsarClient.getOrCreate(this.clientConfigurationData);
        Thread.sleep(100L);
        Transaction transaction = (Transaction)client.newTransaction().withTransactionTimeout(this.transactionTimeout, TimeUnit.MILLISECONDS).build().get();
        return transaction;
    }

    protected PulsarTransactionState<T> beginTransaction() throws Exception {
        switch (this.semantic) {
            case EXACTLY_ONCE: {
                log.debug("transaction is begining in EXACTLY_ONCE mode");
                Transaction transaction = this.createTransaction();
                long txnIdLeastBits = ((TransactionImpl)transaction).getTxnIdLeastBits();
                long txnIdMostBits = ((TransactionImpl)transaction).getTxnIdMostBits();
                TxnID txnID = new TxnID(txnIdMostBits, txnIdLeastBits);
                this.tid2MessagesMap.computeIfAbsent(txnID, key -> new ArrayList());
                this.tid2FuturesMap.computeIfAbsent(txnID, key -> new ArrayList());
                return new PulsarTransactionState(new TxnID(txnIdMostBits, txnIdLeastBits), transaction, this.tid2MessagesMap.get(txnID));
            }
            case AT_LEAST_ONCE: 
            case NONE: {
                PulsarTransactionState currentTransaction = (PulsarTransactionState)this.currentTransaction();
                if (currentTransaction != null && currentTransaction.transactionalId != null) {
                    return new PulsarTransactionState(currentTransaction.transactionalId, currentTransaction.getTransaction(), currentTransaction.getPendingMessages());
                }
                return new PulsarTransactionState(null, null, new ArrayList<MessageId>());
            }
        }
        throw new UnsupportedOperationException("Not implemented semantic");
    }

    protected void preCommit(PulsarTransactionState<T> transaction) throws Exception {
        switch (this.semantic) {
            case EXACTLY_ONCE: 
            case AT_LEAST_ONCE: {
                this.producerFlush(transaction);
                break;
            }
            case NONE: {
                break;
            }
            default: {
                throw new UnsupportedOperationException("Not implemented semantic");
            }
        }
        if (transaction.isTransactional()) {
            log.debug("{} preCommit with pending message size {}", (Object)transaction.transactionalId, (Object)this.tid2MessagesMap.get(((PulsarTransactionState)this.currentTransaction()).transactionalId).size());
        } else {
            log.debug("in AT_LEAST_ONCE mode, producer was flushed by preCommit");
        }
        this.checkErroneous();
    }

    protected void commit(PulsarTransactionState<T> transactionState) {
        if (transactionState.isTransactional()) {
            log.debug("transaction {} is committing", (Object)transactionState.transactionalId.toString());
            CompletableFuture future = ((PulsarTransactionState)transactionState).transaction.commit();
            try {
                future.get(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
                log.debug("transaction {} is committed with messageID size {}", (Object)transactionState.transactionalId.toString(), (Object)this.tid2MessagesMap.get(transactionState.transactionalId).size());
                this.tid2MessagesMap.remove(transactionState.transactionalId);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected void abort(PulsarTransactionState<T> transactionState) {
        if (transactionState.isTransactional()) {
            CompletableFuture future = ((PulsarTransactionState)transactionState).transaction.abort();
            log.debug("transaction {} is aborting", (Object)transactionState.transactionalId.toString());
            try {
                future.get(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
                log.debug("transaction {} is aborted", (Object)transactionState.transactionalId.toString());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected void recoverAndCommit(PulsarTransactionState<T> transaction) {
        if (transaction.isTransactional()) {
            try {
                log.debug("transaction {} is recoverAndCommit...", (Object)transaction.transactionalId);
                TransactionCoordinatorClientImpl tcClient = CachedPulsarClient.getOrCreate(this.clientConfigurationData).getTcClient();
                TxnID transactionalId = transaction.transactionalId;
                tcClient.commit(transactionalId);
            }
            catch (PulsarClientException executionException) {
                log.error("Failed to getOrCreate a PulsarClient");
                throw new RuntimeException(executionException);
            }
            catch (TransactionCoordinatorClientException.InvalidTxnStatusException statusException) {
                log.debug("transaction {} is already committed...", (Object)transaction.transactionalId);
            }
            catch (TransactionCoordinatorClientException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected void recoverAndAbort(PulsarTransactionState<T> transaction) {
        if (transaction.isTransactional()) {
            try {
                log.debug("transaction {} is recoverAndAbort...", (Object)transaction.transactionalId);
                TransactionCoordinatorClientImpl tcClient = CachedPulsarClient.getOrCreate(this.clientConfigurationData).getTcClient();
                TxnID transactionalId = transaction.transactionalId;
                tcClient.abort(transactionalId);
            }
            catch (PulsarClientException executionException) {
                log.error("Failed to getOrCreate a PulsarClient");
                throw new RuntimeException(executionException);
            }
            catch (TransactionCoordinatorClientException.InvalidTxnStatusException statusException) {
                log.debug("transaction {} is already aborted...", (Object)transaction.transactionalId);
            }
            catch (TransactionCoordinatorClientException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected void producerClose() throws Exception {
        this.producerFlush((PulsarTransactionState)this.currentTransaction());
        if (this.admin != null) {
            this.admin.close();
        }
        if (this.singleProducer != null) {
            this.singleProducer.close();
        } else if (this.topic2Producer != null) {
            for (Producer<T> p : this.topic2Producer.values()) {
                p.close();
            }
            this.topic2Producer.clear();
        }
    }

    protected void checkErroneous() throws Exception {
        Throwable e = this.failedWrite;
        if (e != null) {
            this.failedWrite = null;
            throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void acknowledgeMessage() {
        if (this.flushOnCheckpoint) {
            SerializableObject serializableObject = this.pendingRecordsLock;
            synchronized (serializableObject) {
                --this.pendingRecords;
                if (this.pendingRecords == 0L) {
                    this.pendingRecordsLock.notifyAll();
                }
            }
        }
    }

    @VisibleForTesting
    @Internal
    public static class TransactionStateSerializer<T>
    extends TypeSerializerSingleton<PulsarTransactionState<T>> {
        private static final long serialVersionUID = 1L;

        public boolean isImmutableType() {
            return true;
        }

        public PulsarTransactionState<T> createInstance() {
            return null;
        }

        public PulsarTransactionState<T> copy(PulsarTransactionState<T> from) {
            return from;
        }

        public PulsarTransactionState<T> copy(PulsarTransactionState<T> from, PulsarTransactionState<T> reuse) {
            return from;
        }

        public int getLength() {
            return -1;
        }

        public void serialize(PulsarTransactionState<T> record, DataOutputView target) throws IOException {
            if (record.transactionalId == null) {
                target.writeBoolean(false);
            } else {
                target.writeBoolean(true);
                target.writeLong(record.transactionalId.getMostSigBits());
                target.writeLong(record.transactionalId.getLeastSigBits());
                int size = ((PulsarTransactionState)record).pendingMessages.size();
                target.writeInt(size);
                for (MessageId messageId : ((PulsarTransactionState)record).pendingMessages) {
                    byte[] messageData = messageId.toByteArray();
                    target.writeInt(messageData.length);
                    target.write(messageData);
                }
            }
        }

        public PulsarTransactionState<T> deserialize(DataInputView source) throws IOException {
            TxnID transactionalId = null;
            ArrayList<MessageId> pendingMessages = new ArrayList<MessageId>();
            if (source.readBoolean()) {
                long mostSigBits = source.readLong();
                long leastSigBits = source.readLong();
                transactionalId = new TxnID(mostSigBits, leastSigBits);
                int size = source.readInt();
                for (int i = 0; i < size; ++i) {
                    int length = source.readInt();
                    byte[] messageData = new byte[length];
                    source.read(messageData);
                    pendingMessages.add(MessageId.fromByteArray((byte[])messageData));
                }
            }
            return new PulsarTransactionState(transactionalId, null, pendingMessages);
        }

        public PulsarTransactionState<T> deserialize(PulsarTransactionState<T> reuse, DataInputView source) throws IOException {
            return this.deserialize(source);
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            boolean hasTransactionalId = source.readBoolean();
            target.writeBoolean(hasTransactionalId);
            if (hasTransactionalId) {
                long mostSigBits = source.readLong();
                long leastSigBits = source.readLong();
                target.writeLong(mostSigBits);
                target.writeLong(leastSigBits);
            }
        }

        public TypeSerializerSnapshot<PulsarTransactionState<T>> snapshotConfiguration() {
            return new TransactionStateSerializerSnapshot();
        }

        public static final class TransactionStateSerializerSnapshot<T>
        extends SimpleTypeSerializerSnapshot<PulsarTransactionState<T>> {
            public TransactionStateSerializerSnapshot() {
                super(TransactionStateSerializer::new);
            }
        }
    }

    @VisibleForTesting
    @Internal
    public static class PulsarTransactionState<T> {
        private final transient Transaction transaction;
        private final List<MessageId> pendingMessages;
        @Nullable
        final TxnID transactionalId;

        @VisibleForTesting
        public PulsarTransactionState() {
            this(null, null, new ArrayList<MessageId>());
        }

        @VisibleForTesting
        public PulsarTransactionState(@Nullable TxnID transactionalId, @Nullable Transaction transaction, List<MessageId> pendingMessages) {
            this.transactionalId = transactionalId;
            this.transaction = transaction;
            this.pendingMessages = pendingMessages;
        }

        public Transaction getTransaction() {
            return this.transaction;
        }

        boolean isTransactional() {
            return this.transactionalId != null;
        }

        public List<MessageId> getPendingMessages() {
            return this.pendingMessages;
        }

        public String toString() {
            if (this.isTransactional()) {
                return String.format("%s [transactionalId=%s] [pendingMessages=%s]", this.getClass().getSimpleName(), this.transactionalId.toString(), this.pendingMessages.size());
            }
            return String.format("%s this state is not in transactional mode", this.getClass().getSimpleName());
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PulsarTransactionState that = (PulsarTransactionState)o;
            if (!this.pendingMessages.equals(that.pendingMessages)) {
                return false;
            }
            return this.transactionalId != null ? this.transactionalId.equals((Object)that.transactionalId) : that.transactionalId == null;
        }

        public int hashCode() {
            int result = this.pendingMessages.hashCode();
            result = 31 * result + (this.transactionalId != null ? this.transactionalId.hashCode() : 0);
            return result;
        }
    }
}

