/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.sink;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSink<T>
implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger(PulsarSink.class);
    private final PulsarClient client;
    private final PulsarSinkConfig pulsarSinkConfig;
    private final Map<String, String> properties;
    private ComponentStatsManager stats;
    @VisibleForTesting
    PulsarSinkProcessor<T> pulsarSinkProcessor;
    private final TopicSchema topicSchema;

    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties, ComponentStatsManager stats) {
        this.client = client;
        this.pulsarSinkConfig = pulsarSinkConfig;
        this.topicSchema = new TopicSchema(client);
        this.properties = properties;
        this.stats = stats;
    }

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        log.info("Opening pulsar sink with config: {}", (Object)this.pulsarSinkConfig);
        Schema<T> schema = this.initializeSchema();
        if (schema == null) {
            log.info("Since output type is null, not creating any real sink");
            return;
        }
        FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
        switch (processingGuarantees) {
            case ATMOST_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema);
                break;
            }
            case ATLEAST_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema);
                break;
            }
            case EFFECTIVELY_ONCE: {
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema);
            }
        }
    }

    public void write(Record<T> record) {
        SinkRecord sinkRecord;
        TypedMessageBuilder<T> msg = this.pulsarSinkProcessor.newMessage(record);
        if (record.getKey().isPresent()) {
            msg.key((String)record.getKey().get());
        }
        msg.value(record.getValue());
        if (!record.getProperties().isEmpty()) {
            msg.properties(record.getProperties());
        }
        if ((sinkRecord = (SinkRecord)record).getSourceRecord() instanceof PulsarRecord) {
            PulsarRecord pulsarRecord = (PulsarRecord)sinkRecord.getSourceRecord();
            msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get()).property("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
        } else {
            Optional eventTime = sinkRecord.getSourceRecord().getEventTime();
            if (eventTime.isPresent()) {
                msg.eventTime(((Long)eventTime.get()).longValue());
            }
        }
        this.pulsarSinkProcessor.sendOutputMessage(msg, record);
    }

    public void close() throws Exception {
        if (this.pulsarSinkProcessor != null) {
            this.pulsarSinkProcessor.close();
        }
    }

    @VisibleForTesting
    Schema<T> initializeSchema() throws ClassNotFoundException {
        if (StringUtils.isEmpty((CharSequence)this.pulsarSinkConfig.getTypeClassName())) {
            return Schema.BYTES;
        }
        Class typeArg = Reflections.loadClass((String)this.pulsarSinkConfig.getTypeClassName(), (ClassLoader)Thread.currentThread().getContextClassLoader());
        if (Void.class.equals((Object)typeArg)) {
            return null;
        }
        if (!StringUtils.isEmpty((CharSequence)this.pulsarSinkConfig.getSchemaType())) {
            return this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), typeArg, this.pulsarSinkConfig.getSchemaType(), false);
        }
        return this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), typeArg, this.pulsarSinkConfig.getSerdeClassName(), false);
    }

    @VisibleForTesting
    class PulsarSinkEffectivelyOnceProcessor
    extends PulsarSinkProcessorBase {
        public PulsarSinkEffectivelyOnceProcessor(Schema schema) {
            super(schema);
        }

        @Override
        public TypedMessageBuilder<T> newMessage(Record<T> record) {
            if (!record.getPartitionId().isPresent()) {
                throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
            }
            return this.getProducer(String.format("%s-%s", record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), record.getPartitionId().get()), (String)record.getPartitionId().get(), record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic())).newMessage();
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
            if (!record.getRecordSequence().isPresent()) {
                throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
            }
            msg.sequenceId(((Long)record.getRecordSequence().get()).longValue());
            CompletableFuture future = msg.sendAsync();
            ((CompletableFuture)future.thenAccept(messageId -> record.ack())).exceptionally(this.getPublishErrorHandler(record, true));
            future.join();
        }
    }

    @VisibleForTesting
    class PulsarSinkAtLeastOnceProcessor
    extends PulsarSinkAtMostOnceProcessor {
        public PulsarSinkAtLeastOnceProcessor(Schema schema) {
            super(schema);
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
            ((CompletableFuture)msg.sendAsync().thenAccept(messageId -> record.ack())).exceptionally(this.getPublishErrorHandler(record, true));
        }
    }

    @VisibleForTesting
    class PulsarSinkAtMostOnceProcessor
    extends PulsarSinkProcessorBase {
        public PulsarSinkAtMostOnceProcessor(Schema schema) {
            super(schema);
            try {
                this.publishProducers.put(PulsarSink.this.pulsarSinkConfig.getTopic(), this.createProducer(PulsarSink.this.client, PulsarSink.this.pulsarSinkConfig.getTopic(), null, schema));
            }
            catch (PulsarClientException e) {
                log.error("Failed to create Producer while doing user publish", (Throwable)e);
                throw new RuntimeException(e);
            }
        }

        @Override
        public TypedMessageBuilder<T> newMessage(Record<T> record) {
            return this.getProducer(record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic())).newMessage();
        }

        @Override
        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
            ((CompletableFuture)msg.sendAsync().thenAccept(messageId -> {})).exceptionally(this.getPublishErrorHandler(record, false));
        }
    }

    private abstract class PulsarSinkProcessorBase
    implements PulsarSinkProcessor<T> {
        protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap();
        protected Schema schema;

        protected PulsarSinkProcessorBase(Schema schema) {
            this.schema = schema;
        }

        public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema) throws PulsarClientException {
            ProducerBuilder builder = client.newProducer(schema).blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(10L, TimeUnit.MILLISECONDS).compressionType(CompressionType.LZ4).hashingScheme(HashingScheme.Murmur3_32Hash).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter((MessageRouter)FunctionResultRouter.of()).sendTimeout(0, TimeUnit.SECONDS).topic(topic);
            if (producerName != null) {
                builder.producerName(producerName);
            }
            return builder.properties(PulsarSink.this.properties).create();
        }

        protected Producer<T> getProducer(String destinationTopic) {
            return this.getProducer(destinationTopic, null, destinationTopic);
        }

        protected Producer<T> getProducer(String producerId, String producerName, String topicName) {
            return this.publishProducers.computeIfAbsent(producerId, s -> {
                try {
                    return this.createProducer(PulsarSink.this.client, topicName, producerName, this.schema);
                }
                catch (PulsarClientException e) {
                    log.error("Failed to create Producer while doing user publish", (Throwable)e);
                    throw new RuntimeException(e);
                }
            });
        }

        @Override
        public void close() throws Exception {
            ArrayList<CompletableFuture> closeFutures = new ArrayList<CompletableFuture>(this.publishProducers.size());
            for (Map.Entry entry : this.publishProducers.entrySet()) {
                Producer producer = entry.getValue();
                closeFutures.add(producer.closeAsync());
            }
            try {
                FutureUtil.waitForAll(closeFutures);
            }
            catch (Exception e) {
                log.warn("Failed to close all the producers", (Throwable)e);
            }
        }

        public Function<Throwable, Void> getPublishErrorHandler(Record<T> record, boolean failSource) {
            return throwable -> {
                SinkRecord sinkRecord = (SinkRecord)record;
                Record srcRecord = sinkRecord.getSourceRecord();
                if (failSource) {
                    srcRecord.fail();
                }
                String topic = record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic());
                String errorMsg = null;
                if (srcRecord instanceof PulsarRecord) {
                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord)srcRecord).getMessageId());
                    log.error(errorMsg);
                } else {
                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get());
                    log.error(errorMsg);
                }
                PulsarSink.this.stats.incrSinkExceptions(new Exception(errorMsg));
                return null;
            };
        }
    }

    private static interface PulsarSinkProcessor<T> {
        public TypedMessageBuilder<T> newMessage(Record<T> var1);

        public void sendOutputMessage(TypedMessageBuilder<T> var1, Record<T> var2);

        public void close() throws Exception;
    }
}

