package org.apache.pulsar.functions.instance.processors;

import java.lang.reflect.Type;
import java.util.Map;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.connect.core.Record;
import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.source.PulsarConfig;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/instance/processors/MessageProcessorBase.class */
abstract class MessageProcessorBase implements MessageProcessor {
    private static final Logger log;
    protected final PulsarClient client;
    protected final Function.FunctionDetails functionDetails;
    protected Source source;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProcessorBase(PulsarClient pulsarClient, Function.FunctionDetails functionDetails) {
        this.client = pulsarClient;
        this.functionDetails = functionDetails;
    }

    @Override // org.apache.pulsar.functions.instance.processors.MessageProcessor
    public void setupInput(Class<?> cls) throws Exception {
        Object createInstance;
        Function.SourceSpec source = this.functionDetails.getSource();
        if (source.getClassName().isEmpty()) {
            PulsarConfig pulsarConfig = new PulsarConfig();
            pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap());
            pulsarConfig.setSubscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails));
            pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(this.functionDetails.getProcessingGuarantees().name()));
            pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.valueOf(this.functionDetails.getSource().getSubscriptionType().name()));
            pulsarConfig.setTypeClassName(cls.getName());
            createInstance = Reflections.createInstance(PulsarSource.class.getName(), PulsarSource.class.getClassLoader(), new Object[]{this.client, pulsarConfig}, new Class[]{PulsarClient.class, PulsarConfig.class});
        } else {
            createInstance = Reflections.createInstance(source.getClassName(), Thread.currentThread().getContextClassLoader());
        }
        if (!(createInstance instanceof Source)) {
            throw new RuntimeException("Source does not implement correct interface");
        }
        Class<?>[] resolveRawArguments = TypeResolver.resolveRawArguments((Type) Source.class, createInstance.getClass());
        if (!$assertionsDisabled && resolveRawArguments.length <= 0) {
            throw new AssertionError();
        }
        this.source = (Source) createInstance;
        try {
            this.source.open((Map) new Gson().fromJson(source.getConfigs(), Map.class));
        } catch (Exception e) {
            log.info("Error occurred executing open for source: {}", this.functionDetails.getSource().getClassName(), e);
        }
    }

    @Override // org.apache.pulsar.functions.instance.processors.MessageProcessor
    public Record recieveMessage() throws Exception {
        return this.source.read();
    }

    @Override // org.apache.pulsar.functions.instance.processors.MessageProcessor
    public void postReceiveMessage(Record record) {
    }

    @Override // org.apache.pulsar.functions.instance.processors.MessageProcessor
    public void setupOutput(SerDe serDe) throws Exception {
        String topic = this.functionDetails.getSink().getTopic();
        if (topic == null || topic.isEmpty() || serDe == null) {
            return;
        }
        log.info("Starting producer for output topic {}", topic);
        initializeOutputProducer(topic);
    }

    protected abstract void initializeOutputProducer(String str) throws Exception;

    @Override // org.apache.pulsar.functions.instance.processors.MessageProcessor, java.lang.AutoCloseable
    public void close() {
        try {
            if (this.source != null) {
                this.source.close();
            }
        } catch (Exception e) {
            log.warn("Failed to close source {}", this.source, e);
        }
    }

    @Override // org.apache.pulsar.functions.instance.processors.MessageProcessor
    public Source getSource() {
        return this.source;
    }

    static {
        $assertionsDisabled = !MessageProcessorBase.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(MessageProcessorBase.class);
    }
}
