package org.apache.pulsar.functions.sink;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.3.0.jar:org/apache/pulsar/functions/sink/PulsarSink.class */
public class PulsarSink<T> implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarSink.class);
    private final PulsarClient client;
    private final PulsarSinkConfig pulsarSinkConfig;
    private final Map<String, String> properties;

    @VisibleForTesting
    PulsarSinkProcessor<T> pulsarSinkProcessor;
    private final TopicSchema topicSchema;

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.3.0.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkAtLeastOnceProcessor.class */
    class PulsarSinkAtLeastOnceProcessor extends PulsarSink<T>.PulsarSinkAtMostOnceProcessor {
        public PulsarSinkAtLeastOnceProcessor(Schema schema) {
            super(schema);
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkAtMostOnceProcessor, org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, Record<T> record) throws Exception {
            typedMessageBuilder.sendAsync().thenAccept(messageId -> {
                record.ack();
            });
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.3.0.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkAtMostOnceProcessor.class */
    class PulsarSinkAtMostOnceProcessor extends PulsarSink<T>.PulsarSinkProcessorBase {
        public PulsarSinkAtMostOnceProcessor(Schema schema) {
            super(schema);
            try {
                this.publishProducers.put(PulsarSink.this.pulsarSinkConfig.getTopic(), createProducer(PulsarSink.this.client, PulsarSink.this.pulsarSinkConfig.getTopic(), null, schema));
            } catch (PulsarClientException e) {
                PulsarSink.log.error("Failed to create Producer while doing user publish", (Throwable) e);
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public TypedMessageBuilder<T> newMessage(Record<T> record) {
            return getProducer(record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic())).newMessage();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, Record<T> record) throws Exception {
            typedMessageBuilder.sendAsync();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.3.0.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkEffectivelyOnceProcessor.class */
    class PulsarSinkEffectivelyOnceProcessor extends PulsarSink<T>.PulsarSinkProcessorBase {
        public PulsarSinkEffectivelyOnceProcessor(Schema schema) {
            super(schema);
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception {
            if (record.getPartitionId().isPresent()) {
                return getProducer(String.format("%s-%s", record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic()), record.getPartitionId().get()), record.getPartitionId().get(), record.getDestinationTopic().orElse(PulsarSink.this.pulsarSinkConfig.getTopic())).newMessage();
            }
            throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, Record<T> record) throws Exception {
            if (!record.getRecordSequence().isPresent()) {
                throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
            }
            typedMessageBuilder.sequenceId(record.getRecordSequence().get().longValue());
            typedMessageBuilder.sendAsync().thenAccept(messageId -> {
                record.ack();
            }).join();
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.3.0.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkProcessor.class */
    private interface PulsarSinkProcessor<T> {
        TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception;

        void sendOutputMessage(TypedMessageBuilder<T> typedMessageBuilder, Record<T> record) throws Exception;

        void close() throws Exception;
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.3.0.jar:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkProcessorBase.class */
    private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
        protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap();
        protected Schema schema;

        protected PulsarSinkProcessorBase(Schema schema) {
            this.schema = schema;
        }

        public <T> Producer<T> createProducer(PulsarClient pulsarClient, String str, String str2, Schema<T> schema) throws PulsarClientException {
            ProducerBuilder<T> producerBuilder = pulsarClient.newProducer(schema).blockIfQueueFull(true).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.MILLISECONDS).compressionType(CompressionType.LZ4).hashingScheme(HashingScheme.Murmur3_32Hash).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(FunctionResultRouter.of()).sendTimeout(0, TimeUnit.SECONDS).topic(str);
            if (str2 != null) {
                producerBuilder.producerName(str2);
            }
            return producerBuilder.properties(PulsarSink.this.properties).create();
        }

        protected Producer<T> getProducer(String str) {
            return getProducer(str, null, str);
        }

        protected Producer<T> getProducer(String str, String str2, String str3) {
            return this.publishProducers.computeIfAbsent(str, str4 -> {
                try {
                    return createProducer(PulsarSink.this.client, str3, str2, this.schema);
                } catch (PulsarClientException e) {
                    PulsarSink.log.error("Failed to create Producer while doing user publish", (Throwable) e);
                    throw new RuntimeException(e);
                }
            });
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void close() throws Exception {
            ArrayList arrayList = new ArrayList(this.publishProducers.size());
            Iterator<Map.Entry<String, Producer<T>>> it = this.publishProducers.entrySet().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getValue().closeAsync());
            }
            try {
                FutureUtil.waitForAll(arrayList);
            } catch (Exception e) {
                PulsarSink.log.warn("Failed to close all the producers", (Throwable) e);
            }
        }
    }

    public PulsarSink(PulsarClient pulsarClient, PulsarSinkConfig pulsarSinkConfig, Map<String, String> map) {
        this.client = pulsarClient;
        this.pulsarSinkConfig = pulsarSinkConfig;
        this.topicSchema = new TopicSchema(pulsarClient);
        this.properties = map;
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        log.info("Opening pulsar sink with config: {}", this.pulsarSinkConfig);
        Schema<T> initializeSchema = initializeSchema();
        if (initializeSchema == null) {
            log.info("Since output type is null, not creating any real sink");
            return;
        }
        switch (this.pulsarSinkConfig.getProcessingGuarantees()) {
            case ATMOST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(initializeSchema);
                return;
            case ATLEAST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(initializeSchema);
                return;
            case EFFECTIVELY_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(initializeSchema);
                return;
            default:
                return;
        }
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<T> record) throws Exception {
        TypedMessageBuilder<T> newMessage = this.pulsarSinkProcessor.newMessage(record);
        if (record.getKey().isPresent()) {
            newMessage.key(record.getKey().get());
        }
        newMessage.value(record.getValue());
        if (!record.getProperties().isEmpty()) {
            newMessage.properties(record.getProperties());
        }
        SinkRecord sinkRecord = (SinkRecord) record;
        if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
            PulsarRecord pulsarRecord = (PulsarRecord) sinkRecord.getSourceRecord();
            newMessage.property("__pfn_input_topic__", pulsarRecord.getTopicName().get()).property("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
        } else {
            Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime();
            if (eventTime.isPresent()) {
                newMessage.eventTime(eventTime.get().longValue());
            }
        }
        this.pulsarSinkProcessor.sendOutputMessage(newMessage, record);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.pulsarSinkProcessor != null) {
            this.pulsarSinkProcessor.close();
        }
    }

    @VisibleForTesting
    Schema<T> initializeSchema() throws ClassNotFoundException {
        if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
            return (Schema<T>) Schema.BYTES;
        }
        Class<?> loadClass = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), Thread.currentThread().getContextClassLoader());
        if (Void.class.equals(loadClass)) {
            return null;
        }
        return !StringUtils.isEmpty(this.pulsarSinkConfig.getSchemaType()) ? (Schema<T>) this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), loadClass, this.pulsarSinkConfig.getSchemaType(), false) : (Schema<T>) this.topicSchema.getSchema(this.pulsarSinkConfig.getTopic(), loadClass, this.pulsarSinkConfig.getSerdeClassName(), false);
    }
}
