package org.apache.pulsar.functions.sink;

import com.google.common.annotations.VisibleForTesting;
import java.util.Base64;
import java.util.Map;
import java.util.function.Consumer;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
import org.apache.pulsar.functions.instance.producers.Producers;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/sink/PulsarSink.class */
public class PulsarSink<T> implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger(PulsarSink.class);
    private PulsarClient client;
    private PulsarSinkConfig pulsarSinkConfig;
    private SerDe<T> outputSerDe;
    private PulsarSinkProcessor pulsarSinkProcessor;

    /* loaded from: input_file:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkAtLeastOnceProcessor.class */
    private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor<T> {
        private Producer<byte[]> producer;

        private PulsarSinkAtLeastOnceProcessor() {
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void initializeOutputProducer(String str) throws Exception {
            this.producer = AbstractOneOuputTopicProducers.createProducer(PulsarSink.this.client, PulsarSink.this.pulsarSinkConfig.getTopic());
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(MessageBuilder messageBuilder, Record<T> record) throws Exception {
            this.producer.sendAsync(messageBuilder.build()).thenAccept((Consumer) messageId -> {
                record.ack();
            });
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void close() throws Exception {
            if (null != this.producer) {
                try {
                    this.producer.close();
                } catch (PulsarClientException e) {
                    PulsarSink.log.warn("Fail to close producer for processor {}", PulsarSink.this.pulsarSinkConfig.getTopic(), e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkAtMostOnceProcessor.class */
    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor<T> {
        private Producer<byte[]> producer;

        private PulsarSinkAtMostOnceProcessor() {
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void initializeOutputProducer(String str) throws Exception {
            this.producer = AbstractOneOuputTopicProducers.createProducer(PulsarSink.this.client, PulsarSink.this.pulsarSinkConfig.getTopic());
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(MessageBuilder messageBuilder, Record<T> record) throws Exception {
            this.producer.sendAsync(messageBuilder.build());
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void close() throws Exception {
            if (null != this.producer) {
                try {
                    this.producer.close();
                } catch (PulsarClientException e) {
                    PulsarSink.log.warn("Fail to close producer for processor {}", PulsarSink.this.pulsarSinkConfig.getTopic(), e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkEffectivelyOnceProcessor.class */
    private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor<T>, ConsumerEventListener {
        protected Producers outputProducer;

        private PulsarSinkEffectivelyOnceProcessor() {
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void initializeOutputProducer(String str) throws Exception {
            this.outputProducer = new MultiConsumersOneOuputTopicProducers(PulsarSink.this.client, str);
            this.outputProducer.initialize();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void sendOutputMessage(MessageBuilder messageBuilder, Record<T> record) throws Exception {
            if (record.getRecordSequence().isPresent()) {
                messageBuilder.setSequenceId(record.getRecordSequence().get().longValue());
            }
            this.outputProducer.getProducer(record.getPartitionId().get()).sendAsync(messageBuilder.build()).thenAccept((Consumer) obj -> {
                record.ack();
            }).join();
        }

        @Override // org.apache.pulsar.functions.sink.PulsarSink.PulsarSinkProcessor
        public void close() throws Exception {
            if (null != this.outputProducer) {
                this.outputProducer.close();
                this.outputProducer = null;
            }
        }

        public void becameActive(org.apache.pulsar.client.api.Consumer<?> consumer, int i) {
            if (null != this.outputProducer) {
                try {
                    this.outputProducer.getProducer(String.format("%s-%d", consumer.getTopic(), Integer.valueOf(i)));
                } catch (PulsarClientException e) {
                    PulsarSink.log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}", consumer.getTopic(), Integer.valueOf(i));
                }
            }
        }

        public void becameInactive(org.apache.pulsar.client.api.Consumer<?> consumer, int i) {
            if (null != this.outputProducer) {
                this.outputProducer.closeProducer(String.format("%s-%d", consumer.getTopic(), Integer.valueOf(i)));
            }
        }

        Producers getOutputProducer() {
            return this.outputProducer;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/functions/sink/PulsarSink$PulsarSinkProcessor.class */
    private interface PulsarSinkProcessor<T> {
        void initializeOutputProducer(String str) throws Exception;

        void sendOutputMessage(MessageBuilder messageBuilder, Record<T> record) throws Exception;

        void close() throws Exception;
    }

    public PulsarSink(PulsarClient pulsarClient, PulsarSinkConfig pulsarSinkConfig) {
        this.client = pulsarClient;
        this.pulsarSinkConfig = pulsarSinkConfig;
    }

    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        setupSerDe();
        switch (this.pulsarSinkConfig.getProcessingGuarantees()) {
            case ATMOST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor();
                break;
            case ATLEAST_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor();
                break;
            case EFFECTIVELY_ONCE:
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor();
                break;
        }
        this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic());
    }

    public void write(Record<T> record) throws Exception {
        try {
            byte[] serialize = this.outputSerDe.serialize(record.getValue());
            MessageBuilder create = MessageBuilder.create();
            if (record.getKey().isPresent()) {
                create.setKey(record.getKey().get());
            }
            create.setContent(serialize);
            if (!record.getProperties().isEmpty()) {
                create.setProperties(record.getProperties());
            }
            SinkRecord sinkRecord = (SinkRecord) record;
            if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
                PulsarRecord pulsarRecord = (PulsarRecord) sinkRecord.getSourceRecord();
                create.setProperty("__pfn_input_topic__", pulsarRecord.getTopicName()).setProperty("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
            }
            this.pulsarSinkProcessor.sendOutputMessage(create, record);
        } catch (Exception e) {
            throw new RuntimeException("Error occured when attempting to serialize output:", e);
        }
    }

    public void close() throws Exception {
        if (this.pulsarSinkProcessor != null) {
            this.pulsarSinkProcessor.close();
        }
    }

    @VisibleForTesting
    void setupSerDe() throws ClassNotFoundException {
        if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
            this.outputSerDe = InstanceUtils.initializeDefaultSerDe(byte[].class);
            return;
        }
        Class<?> loadClass = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), Thread.currentThread().getContextClassLoader());
        if (Void.class.equals(loadClass)) {
            return;
        }
        if (this.pulsarSinkConfig.getSerDeClassName() == null || this.pulsarSinkConfig.getSerDeClassName().isEmpty() || this.pulsarSinkConfig.getSerDeClassName().equals(DefaultSerDe.class.getName())) {
            this.outputSerDe = InstanceUtils.initializeDefaultSerDe(loadClass);
        } else {
            this.outputSerDe = InstanceUtils.initializeSerDe(this.pulsarSinkConfig.getSerDeClassName(), Thread.currentThread().getContextClassLoader(), loadClass);
        }
        Class<?>[] resolveRawArguments = TypeResolver.resolveRawArguments(SerDe.class, (Class) this.outputSerDe.getClass());
        if (this.outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
            if (!DefaultSerDe.IsSupportedType(loadClass)) {
                throw new RuntimeException("Default Serde does not support type " + loadClass);
            }
        } else if (!resolveRawArguments[0].isAssignableFrom(loadClass)) {
            throw new RuntimeException("Inconsistent types found between function output type and output serde type:  function type = " + loadClass + "should be assignable from " + resolveRawArguments[0]);
        }
    }
}
