/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.sink;

import java.nio.charset.StandardCharsets;
import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.CryptoConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang.StringUtils;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSink<T>
implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger(PulsarSink.class);
    private final PulsarClient client;
    private final PulsarSinkConfig pulsarSinkConfig;
    private final Map<String, String> properties;
    private final ClassLoader functionClassLoader;
    private ComponentStatsManager stats;
    @VisibleForTesting
    PulsarSinkProcessor<T> pulsarSinkProcessor;
    private final TopicSchema topicSchema;

    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties, ComponentStatsManager stats, ClassLoader functionClassLoader) {
        this.client = client;
        this.pulsarSinkConfig = pulsarSinkConfig;
        this.topicSchema = new TopicSchema(client);
        this.properties = properties;
        this.stats = stats;
        this.functionClassLoader = functionClassLoader;
    }

    @Override
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        log.info("Opening pulsar sink with config: {}", (Object)this.pulsarSinkConfig);
        Schema<T> schema = this.initializeSchema();
        if (schema == null) {
            log.info("Since output type is null, not creating any real sink");
            return;
        }
        Crypto crypto = this.initializeCrypto();
        if (crypto == null) {
            log.info("crypto key reader is not provided, not enabling end to end encryption");
        }
        FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
        switch (processingGuarantees) {
            case ATMOST_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema, crypto);
                break;
            }
            case ATLEAST_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema, crypto);
                break;
            }
            case EFFECTIVELY_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema, crypto);
            }
        }
    }

    @Override
    public void write(Record<T> record) {
        SinkRecord sinkRecord = (SinkRecord)record;
        TypedMessageBuilder<T> msg = this.pulsarSinkProcessor.newMessage(sinkRecord);
        if (record.getKey().isPresent() && (!(record.getSchema() instanceof KeyValueSchema) || ((KeyValueSchema)record.getSchema()).getKeyValueEncodingType() != KeyValueEncodingType.SEPARATED)) {
            msg.key(record.getKey().get());
        }
        msg.value(record.getValue());
        if (!record.getProperties().isEmpty() && this.pulsarSinkConfig.isForwardSourceMessageProperty()) {
            msg.properties(record.getProperties());
        }
        if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
            PulsarRecord pulsarRecord = (PulsarRecord)sinkRecord.getSourceRecord();
            msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get()).property("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray()), StandardCharsets.UTF_8));
        } else {
            Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime();
            eventTime.ifPresent(msg::eventTime);
        }
        this.pulsarSinkProcessor.sendOutputMessage(msg, sinkRecord);
    }

    @Override
    public void close() throws Exception {
        if (this.pulsarSinkProcessor != null) {
            this.pulsarSinkProcessor.close();
        }
    }

    @VisibleForTesting
    Schema<T> initializeSchema() throws ClassNotFoundException {
        if (org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
            return Schema.BYTES;
        }
        Class typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), this.functionClassLoader);
        if (Void.class.equals((Object)typeArg)) {
            return null;
        }
        ConsumerConfig consumerConfig = new ConsumerConfig();
        consumerConfig.setSchemaProperties(this.pulsarSinkConfig.getSchemaProperties());
        if (!org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils.isEmpty(this.pulsarSinkConfig.getSchemaType())) {
            if (GenericRecord.class.isAssignableFrom(typeArg)) {
                consumerConfig.setSchemaType(SchemaType.AUTO_CONSUME.toString());
                SchemaType configuredSchemaType = SchemaType.valueOf(this.pulsarSinkConfig.getSchemaType());
                if (SchemaType.AUTO_CONSUME != configuredSchemaType) {
                    log.info("The configured schema type {} is not able to write GenericRecords. So overwrite the schema type to be {}", (Object)configuredSchemaType, (Object)SchemaType.AUTO_CONSUME);
                }
            } else {
                consumerConfig.setSchemaType(this.pulsarSinkConfig.getSchemaType());
            }
            return this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), typeArg, consumerConfig, false);
        }
        consumerConfig.setSchemaType(this.pulsarSinkConfig.getSerdeClassName());
        return this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), typeArg, consumerConfig, false, this.functionClassLoader);
    }

    @VisibleForTesting
    Crypto initializeCrypto() throws ClassNotFoundException {
        if (this.pulsarSinkConfig.getProducerConfig() == null || this.pulsarSinkConfig.getProducerConfig().getCryptoConfig() == null || StringUtils.isEmpty(this.pulsarSinkConfig.getProducerConfig().getCryptoConfig().getCryptoKeyReaderClassName())) {
            return null;
        }
        CryptoConfig cryptoConfig = this.pulsarSinkConfig.getProducerConfig().getCryptoConfig();
        if (Security.getProvider("BC") == null) {
            Security.addProvider((Provider)new BouncyCastleProvider());
        }
        String[] encryptionKeys = cryptoConfig.getEncryptionKeys();
        Crypto.CryptoBuilder bldr = Crypto.builder().failureAction(cryptoConfig.getProducerCryptoFailureAction()).encryptionKeys(encryptionKeys);
        bldr.keyReader(CryptoUtils.getCryptoKeyReaderInstance(cryptoConfig.getCryptoKeyReaderClassName(), cryptoConfig.getCryptoKeyReaderConfig(), this.functionClassLoader));
        return bldr.build();
    }

    private static class Crypto {
        private CryptoKeyReader keyReader;
        private ProducerCryptoFailureAction failureAction;
        private String[] encryptionKeys;

        Crypto(CryptoKeyReader keyReader, ProducerCryptoFailureAction failureAction, String[] encryptionKeys) {
            this.keyReader = keyReader;
            this.failureAction = failureAction;
            this.encryptionKeys = encryptionKeys;
        }

        public static CryptoBuilder builder() {
            return new CryptoBuilder();
        }

        public CryptoKeyReader getKeyReader() {
            return this.keyReader;
        }

        public ProducerCryptoFailureAction getFailureAction() {
            return this.failureAction;
        }

        public String[] getEncryptionKeys() {
            return this.encryptionKeys;
        }

        public void setKeyReader(CryptoKeyReader keyReader) {
            this.keyReader = keyReader;
        }

        public void setFailureAction(ProducerCryptoFailureAction failureAction) {
            this.failureAction = failureAction;
        }

        public void setEncryptionKeys(String[] encryptionKeys) {
            this.encryptionKeys = encryptionKeys;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Crypto)) {
                return false;
            }
            Crypto other = (Crypto)o;
            if (!other.canEqual(this)) {
                return false;
            }
            CryptoKeyReader this$keyReader = this.getKeyReader();
            CryptoKeyReader other$keyReader = other.getKeyReader();
            if (this$keyReader == null ? other$keyReader != null : !this$keyReader.equals(other$keyReader)) {
                return false;
            }
            ProducerCryptoFailureAction this$failureAction = this.getFailureAction();
            ProducerCryptoFailureAction other$failureAction = other.getFailureAction();
            if (this$failureAction == null ? other$failureAction != null : !((Object)((Object)this$failureAction)).equals((Object)other$failureAction)) {
                return false;
            }
            return Arrays.deepEquals(this.getEncryptionKeys(), other.getEncryptionKeys());
        }

        protected boolean canEqual(Object other) {
            return other instanceof Crypto;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            CryptoKeyReader $keyReader = this.getKeyReader();
            result = result * 59 + ($keyReader == null ? 43 : $keyReader.hashCode());
            ProducerCryptoFailureAction $failureAction = this.getFailureAction();
            result = result * 59 + ($failureAction == null ? 43 : ((Object)((Object)$failureAction)).hashCode());
            result = result * 59 + Arrays.deepHashCode(this.getEncryptionKeys());
            return result;
        }

        public String toString() {
            return "PulsarSink.Crypto(keyReader=" + this.getKeyReader() + ", failureAction=" + (Object)((Object)this.getFailureAction()) + ", encryptionKeys=" + Arrays.deepToString(this.getEncryptionKeys()) + ")";
        }

        public static class CryptoBuilder {
            private CryptoKeyReader keyReader;
            private ProducerCryptoFailureAction failureAction;
            private String[] encryptionKeys;

            CryptoBuilder() {
            }

            public CryptoBuilder keyReader(CryptoKeyReader keyReader) {
                this.keyReader = keyReader;
                return this;
            }

            public CryptoBuilder failureAction(ProducerCryptoFailureAction failureAction) {
                this.failureAction = failureAction;
                return this;
            }

            public CryptoBuilder encryptionKeys(String[] encryptionKeys) {
                this.encryptionKeys = encryptionKeys;
                return this;
            }

            public Crypto build() {
                return new Crypto(this.keyReader, this.failureAction, this.encryptionKeys);
            }

            public String toString() {
                return "PulsarSink.Crypto.CryptoBuilder(keyReader=" + this.keyReader + ", failureAction=" + (Object)((Object)this.failureAction) + ", encryptionKeys=" + Arrays.deepToString(this.encryptionKeys) + ")";
            }
        }
    }

    @VisibleForTesting
    class PulsarSinkAtMostOnceProcessor
    extends PulsarSinkProcessorBase {
        public PulsarSinkAtMostOnceProcessor(Schema schema, Crypto crypto) {
            super(schema, crypto);
            if (!(schema instanceof AutoConsumeSchema)) {
                try {
                    this.publishProducers.put(PulsarSink.this.pulsarSinkConfig.getTopic(), this.createProducer(PulsarSink.this.client, PulsarSink.this.pulsarSinkConfig.getTopic(), null, schema));
                }
                catch (PulsarClientException e) {
                    log.error("Failed to create Producer while doing user publish", (Throwable)e);
                    throw new RuntimeException(e);
                }
            } else if (log.isDebugEnabled()) {
                log.debug("The Pulsar producer is not initialized until the first record is published for `AUTO_CONSUME` schema.");
            }
        }

        @Override
        public TypedMessageBuilder<T> newMessage(SinkRecord<T> record) {
            Schema schemaToWrite = record.getSchema();
            if (record.getSourceRecord() instanceof PulsarRecord) {
                schemaToWrite = this.schema;
            }
            if (schemaToWrite != null) {
                return this.getProducer(record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), schemaToWrite).newMessage(schemaToWrite);
            }
            return this.getProducer(record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), null).newMessage();
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
            ((CompletableFuture)msg.sendAsync().thenAccept(messageId -> {})).exceptionally(this.getPublishErrorHandler(record, false));
        }
    }

    private static interface PulsarSinkProcessor<T> {
        public TypedMessageBuilder<T> newMessage(SinkRecord<T> var1);

        public void sendOutputMessage(TypedMessageBuilder<T> var1, SinkRecord<T> var2);

        public void close() throws Exception;
    }

    @VisibleForTesting
    class PulsarSinkAtLeastOnceProcessor
    extends PulsarSinkAtMostOnceProcessor {
        public PulsarSinkAtLeastOnceProcessor(Schema schema, Crypto crypto) {
            super(schema, crypto);
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
            ((CompletableFuture)msg.sendAsync().thenAccept(messageId -> record.ack())).exceptionally(this.getPublishErrorHandler(record, true));
        }
    }

    @VisibleForTesting
    class PulsarSinkEffectivelyOnceProcessor
    extends PulsarSinkProcessorBase {
        public PulsarSinkEffectivelyOnceProcessor(Schema schema, Crypto crypto) {
            super(schema, crypto);
        }

        @Override
        public TypedMessageBuilder<T> newMessage(SinkRecord<T> record) {
            if (!record.getPartitionId().isPresent()) {
                throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
            }
            Schema schemaToWrite = record.getSchema();
            if (record.getSourceRecord() instanceof PulsarRecord) {
                schemaToWrite = this.schema;
            }
            Producer producer = this.getProducer(String.format("%s-%s", record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), record.getPartitionId().get()), record.getPartitionId().get(), record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), schemaToWrite);
            if (schemaToWrite != null) {
                return producer.newMessage(schemaToWrite);
            }
            return producer.newMessage();
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
            if (!record.getRecordSequence().isPresent()) {
                throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
            }
            msg.sequenceId(record.getRecordSequence().get());
            CompletableFuture<MessageId> future = msg.sendAsync();
            ((CompletableFuture)future.thenAccept(messageId -> record.ack())).exceptionally(this.getPublishErrorHandler(record, true));
        }
    }

    abstract class PulsarSinkProcessorBase
    implements PulsarSinkProcessor<T> {
        protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap();
        protected Schema schema;
        protected Crypto crypto;

        protected PulsarSinkProcessorBase(Schema schema, Crypto crypto) {
            this.schema = schema;
            this.crypto = crypto;
        }

        public Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema) throws PulsarClientException {
            ProducerBuilder builder = client.newProducer(schema).blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).compressionType(CompressionType.LZ4).hashingScheme(HashingScheme.Murmur3_32Hash).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(FunctionResultRouter.of()).sendTimeout(0, TimeUnit.SECONDS).topic(topic);
            if (producerName != null) {
                builder.producerName(producerName);
            }
            if (PulsarSink.this.pulsarSinkConfig.getProducerConfig() != null) {
                ProducerConfig producerConfig = PulsarSink.this.pulsarSinkConfig.getProducerConfig();
                if (producerConfig.getMaxPendingMessages() != 0) {
                    builder.maxPendingMessages(producerConfig.getMaxPendingMessages());
                }
                if (producerConfig.getMaxPendingMessagesAcrossPartitions() != 0) {
                    builder.maxPendingMessagesAcrossPartitions(producerConfig.getMaxPendingMessagesAcrossPartitions());
                }
                if (producerConfig.getCryptoConfig() != null) {
                    builder.cryptoKeyReader(this.crypto.keyReader);
                    builder.cryptoFailureAction(this.crypto.failureAction);
                    for (String encryptionKeyName : this.crypto.getEncryptionKeys()) {
                        builder.addEncryptionKey(encryptionKeyName);
                    }
                }
                if (producerConfig.getBatchBuilder() != null) {
                    if (producerConfig.getBatchBuilder().equals("KEY_BASED")) {
                        builder.batcherBuilder(BatcherBuilder.KEY_BASED);
                    } else {
                        builder.batcherBuilder(BatcherBuilder.DEFAULT);
                    }
                }
            }
            return builder.properties(PulsarSink.this.properties).create();
        }

        protected Producer<T> getProducer(String destinationTopic, Schema schema) {
            return this.getProducer(destinationTopic, null, destinationTopic, schema);
        }

        protected Producer<T> getProducer(String producerId, String producerName, String topicName, Schema schema) {
            return this.publishProducers.computeIfAbsent(producerId, s -> {
                try {
                    log.info("Initializing producer {} on topic {} with schema {}", new Object[]{producerName, topicName, schema});
                    Producer producer = this.createProducer(PulsarSink.this.client, topicName, producerName, schema != null ? schema : this.schema);
                    log.info("Initialized producer {} on topic {} with schema {}: {} -> {}", new Object[]{producerName, topicName, schema, producerId, producer});
                    return producer;
                }
                catch (PulsarClientException e) {
                    log.error("Failed to create Producer while doing user publish", (Throwable)e);
                    throw new RuntimeException(e);
                }
            });
        }

        @Override
        public void close() throws Exception {
            ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<CompletableFuture<Void>>(this.publishProducers.size());
            for (Map.Entry entry : this.publishProducers.entrySet()) {
                Producer producer = entry.getValue();
                closeFutures.add(producer.closeAsync());
            }
            try {
                FutureUtil.waitForAll(closeFutures);
            }
            catch (Exception e) {
                log.warn("Failed to close all the producers", (Throwable)e);
            }
        }

        public Function<Throwable, Void> getPublishErrorHandler(SinkRecord<T> record, boolean failSource) {
            return throwable -> {
                Record srcRecord = record.getSourceRecord();
                if (failSource) {
                    srcRecord.fail();
                }
                String topic = record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic());
                String errorMsg = null;
                if (srcRecord instanceof PulsarRecord) {
                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord)srcRecord).getMessageId());
                } else {
                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s]", topic, throwable.getMessage());
                    if (record.getRecordSequence().isPresent()) {
                        errorMsg = String.format(errorMsg + " with src sequence id [%s]", record.getRecordSequence().get());
                    }
                }
                log.error(errorMsg);
                PulsarSink.this.stats.incrSinkExceptions(new Exception(errorMsg));
                return null;
            };
        }
    }
}

