/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.instance;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import net.jodah.typetools.TypeResolver;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.instance.ContextImpl;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.JavaExecutionResult;
import org.apache.pulsar.functions.instance.JavaInstance;
import org.apache.pulsar.functions.instance.LogAppender;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
import org.apache.pulsar.functions.instance.state.InstanceStateManager;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
import org.apache.pulsar.functions.instance.state.StateStoreProvider;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.pulsar.shade.com.google.gson.reflect.TypeToken;
import org.apache.pulsar.shade.io.prometheus.client.CollectorRegistry;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaInstanceRunnable
implements AutoCloseable,
Runnable {
    private static final Logger log = LoggerFactory.getLogger(JavaInstanceRunnable.class);
    private final InstanceConfig instanceConfig;
    private final FunctionCacheManager fnCache;
    private final String jarFile;
    private final PulsarClientImpl client;
    private LogAppender logAppender;
    private final String stateStorageServiceUrl;
    private StateStoreProvider stateStoreProvider;
    private StateManager stateManager;
    private JavaInstance javaInstance;
    private Throwable deathException;
    private ComponentStatsManager stats;
    private Record<?> currentRecord;
    private Source source;
    private Sink sink;
    private final SecretsProvider secretsProvider;
    private CollectorRegistry collectorRegistry;
    private final String[] metricsLabels;
    private InstanceCache instanceCache;
    private final Function.FunctionDetails.ComponentType componentType;
    private final Map<String, String> properties;
    private final ClassLoader instanceClassLoader;
    private ClassLoader functionClassLoader;
    private String narExtractionDirectory;

    public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, PulsarClient pulsarClient, String stateStorageServiceUrl, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String narExtractionDirectory) {
        this.instanceConfig = instanceConfig;
        this.fnCache = fnCache;
        this.jarFile = jarFile;
        this.client = (PulsarClientImpl)pulsarClient;
        this.stateStorageServiceUrl = stateStorageServiceUrl;
        this.secretsProvider = secretsProvider;
        this.collectorRegistry = collectorRegistry;
        this.narExtractionDirectory = narExtractionDirectory;
        this.metricsLabels = new String[]{instanceConfig.getFunctionDetails().getTenant(), String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace()), instanceConfig.getFunctionDetails().getName(), String.valueOf(instanceConfig.getInstanceId()), instanceConfig.getClusterName(), FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())};
        this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
        this.properties = InstanceUtils.getProperties(this.componentType, FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()), this.instanceConfig.getInstanceId());
        this.collectorRegistry = collectorRegistry;
        this.instanceClassLoader = Thread.currentThread().getContextClassLoader();
    }

    private synchronized void setup() throws Exception {
        this.instanceCache = InstanceCache.getInstanceCache();
        if (this.collectorRegistry == null) {
            this.collectorRegistry = new CollectorRegistry();
        }
        this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService(), this.componentType);
        ThreadContext.put((String)"function", (String)FunctionCommon.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
        ThreadContext.put((String)"functionname", (String)this.instanceConfig.getFunctionDetails().getName());
        ThreadContext.put((String)"instance", (String)this.instanceConfig.getInstanceName());
        log.info("Starting Java Instance {} : \n Details = {}", (Object)this.instanceConfig.getFunctionDetails().getName(), (Object)this.instanceConfig.getFunctionDetails());
        this.functionClassLoader = this.loadJars();
        Object object = this.instanceConfig.getFunctionDetails().getClassName().equals(WindowFunctionExecutor.class.getName()) ? Reflections.createInstance(this.instanceConfig.getFunctionDetails().getClassName(), this.instanceClassLoader) : Reflections.createInstance(this.instanceConfig.getFunctionDetails().getClassName(), this.functionClassLoader);
        if (!(object instanceof org.apache.pulsar.functions.api.Function) && !(object instanceof Function)) {
            throw new RuntimeException("User class must either be Function or java.util.Function");
        }
        this.setupStateStore();
        ContextImpl contextImpl = this.setupContext();
        this.setupOutput(contextImpl);
        this.setupInput(contextImpl);
        this.setupLogHandler();
        this.javaInstance = new JavaInstance(contextImpl, object, this.instanceConfig);
    }

    ContextImpl setupContext() {
        Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger("function-" + this.instanceConfig.getFunctionDetails().getName());
        return new ContextImpl(this.instanceConfig, instanceLog, this.client, this.secretsProvider, this.collectorRegistry, this.metricsLabels, this.componentType, this.stats, this.stateManager);
    }

    @Override
    public void run() {
        try {
            try {
                this.setup();
                Thread currentThread = Thread.currentThread();
                while (true) {
                    this.currentRecord = this.readInput();
                    this.stats.incrTotalReceived();
                    if (this.instanceConfig.getFunctionDetails().getProcessingGuarantees() == Function.ProcessingGuarantees.ATMOST_ONCE && this.instanceConfig.getFunctionDetails().getAutoAck()) {
                        this.currentRecord.ack();
                    }
                    this.addLogTopicHandler();
                    this.stats.setLastInvocation(System.currentTimeMillis());
                    this.stats.processTimeStart();
                    Thread.currentThread().setContextClassLoader(this.functionClassLoader);
                    JavaExecutionResult result = this.javaInstance.handleMessage(this.currentRecord, this.currentRecord.getValue(), this::handleResult, cause -> currentThread.interrupt());
                    Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                    this.stats.processTimeEnd();
                    this.removeLogTopicHandler();
                    if (result == null) continue;
                    this.handleResult(this.currentRecord, result);
                }
            }
            catch (Throwable t) {
                log.error("[{}] Uncaught exception in Java Instance", (Object)FunctionCommon.getFullyQualifiedInstanceId(this.instanceConfig.getFunctionDetails().getTenant(), this.instanceConfig.getFunctionDetails().getNamespace(), this.instanceConfig.getFunctionDetails().getName(), this.instanceConfig.getInstanceId()), (Object)t);
                this.deathException = t;
                if (this.stats != null) {
                    this.stats.incrSysExceptions(t);
                }
                log.info("Closing instance");
                this.close();
            }
        }
        catch (Throwable throwable) {
            log.info("Closing instance");
            this.close();
            throw throwable;
        }
    }

    private ClassLoader loadJars() throws Exception {
        try {
            log.info("Load JAR: {}", (Object)this.jarFile);
            this.fnCache.registerFunctionInstanceWithArchive(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName(), this.jarFile, this.narExtractionDirectory);
        }
        catch (FileNotFoundException e) {
            this.fnCache.registerFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName(), Arrays.asList(this.jarFile), Collections.emptyList());
        }
        log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}", (Object)this.instanceConfig.getFunctionDetails().getName(), (Object)this.fnCache.getClassLoader(this.instanceConfig.getFunctionId()));
        ClassLoader fnClassLoader = this.fnCache.getClassLoader(this.instanceConfig.getFunctionId());
        if (null == fnClassLoader) {
            throw new Exception("No function class loader available.");
        }
        return fnClassLoader;
    }

    private void setupStateStore() throws Exception {
        this.stateManager = new InstanceStateManager();
        if (null == this.stateStorageServiceUrl) {
            this.stateStoreProvider = StateStoreProvider.NULL;
        } else {
            this.stateStoreProvider = new BKStateStoreProviderImpl();
            HashMap<String, Object> stateStoreProviderConfig = new HashMap<String, Object>();
            stateStoreProviderConfig.put("stateStorageServiceUrl", this.stateStorageServiceUrl);
            this.stateStoreProvider.init(stateStoreProviderConfig, this.instanceConfig.getFunctionDetails());
            Object store = this.stateStoreProvider.getStateStore(this.instanceConfig.getFunctionDetails().getTenant(), this.instanceConfig.getFunctionDetails().getNamespace(), this.instanceConfig.getFunctionDetails().getName());
            StateStoreContextImpl context = new StateStoreContextImpl();
            store.init(context);
            this.stateManager.registerStore((StateStore)store);
        }
    }

    private void processAsyncResults() throws InterruptedException {
    }

    private void handleResult(Record srcRecord, JavaExecutionResult result) {
        if (result.getUserException() != null) {
            Exception t = result.getUserException();
            log.warn("Encountered exception when processing message {}", (Object)srcRecord, (Object)t);
            this.stats.incrUserExceptions(t);
            srcRecord.fail();
        } else {
            if (result.getResult() != null) {
                this.sendOutputMessage(srcRecord, result.getResult());
            } else if (this.instanceConfig.getFunctionDetails().getAutoAck()) {
                srcRecord.ack();
            }
            this.stats.incrTotalProcessedSuccessfully();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendOutputMessage(Record srcRecord, Object output) {
        if (!(this.sink instanceof PulsarSink)) {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
        }
        try {
            this.sink.write(new SinkRecord<Object>(srcRecord, output));
        }
        catch (Exception e) {
            log.info("Encountered exception in sink write: ", (Throwable)e);
            this.stats.incrSinkExceptions(e);
            srcRecord.fail();
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
    }

    private Record readInput() throws Exception {
        Record record;
        if (!(this.source instanceof PulsarSource)) {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
        }
        try {
            record = this.source.read();
        }
        catch (Exception e) {
            this.stats.incrSourceExceptions(e);
            log.error("Encountered exception in source read", (Throwable)e);
            throw e;
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
        if (record == null) {
            throw new IllegalArgumentException("The record returned by the source cannot be null");
        }
        if (record.getValue() == null) {
            throw new IllegalArgumentException("The value in the record returned by the source cannot be null");
        }
        return record;
    }

    @Override
    public synchronized void close() {
        if (this.stats != null) {
            this.stats.close();
            this.stats = null;
        }
        if (this.source != null) {
            if (!(this.source instanceof PulsarSource)) {
                Thread.currentThread().setContextClassLoader(this.functionClassLoader);
            }
            try {
                this.source.close();
            }
            catch (Throwable e) {
                log.error("Failed to close source {}", (Object)this.instanceConfig.getFunctionDetails().getSource().getClassName(), (Object)e);
            }
            finally {
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            }
            this.source = null;
        }
        if (this.sink != null) {
            if (!(this.sink instanceof PulsarSink)) {
                Thread.currentThread().setContextClassLoader(this.functionClassLoader);
            }
            try {
                this.sink.close();
            }
            catch (Throwable e) {
                log.error("Failed to close sink {}", (Object)this.instanceConfig.getFunctionDetails().getSource().getClassName(), (Object)e);
            }
            finally {
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            }
            this.sink = null;
        }
        if (null != this.javaInstance) {
            this.javaInstance.close();
            this.javaInstance = null;
        }
        if (null != this.stateManager) {
            this.stateManager.close();
        }
        if (null != this.stateStoreProvider) {
            this.stateStoreProvider.close();
        }
        if (this.instanceCache != null) {
            this.fnCache.unregisterFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName());
            log.info("Unloading JAR files for function {}", (Object)this.instanceConfig);
            this.instanceCache = null;
        }
        if (this.logAppender != null) {
            this.removeLogTopicAppender(LoggerContext.getContext());
            this.removeLogTopicAppender(LoggerContext.getContext((boolean)false));
            this.logAppender.stop();
            this.logAppender = null;
        }
    }

    public synchronized String getStatsAsString() throws IOException {
        if (this.stats != null) {
            return this.stats.getStatsAsString();
        }
        return "";
    }

    public synchronized InstanceCommunication.MetricsData getAndResetMetrics() {
        InstanceCommunication.MetricsData metricsData = this.internalGetMetrics();
        this.internalResetMetrics();
        return metricsData;
    }

    public synchronized InstanceCommunication.MetricsData getMetrics() {
        return this.internalGetMetrics();
    }

    public synchronized void resetMetrics() {
        this.internalResetMetrics();
    }

    private InstanceCommunication.MetricsData internalGetMetrics() {
        Map<String, Double> userMetrics;
        InstanceCommunication.MetricsData.Builder bldr = this.createMetricsDataBuilder();
        if (this.javaInstance != null && (userMetrics = this.javaInstance.getMetrics()) != null) {
            bldr.putAllUserMetrics(userMetrics);
        }
        return bldr.build();
    }

    private void internalResetMetrics() {
        if (this.stats != null) {
            this.stats.reset();
        }
        if (this.javaInstance != null) {
            this.javaInstance.resetMetrics();
        }
    }

    private InstanceCommunication.MetricsData.Builder createMetricsDataBuilder() {
        InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
        if (this.stats != null) {
            bldr.setProcessedSuccessfullyTotal((long)this.stats.getTotalProcessedSuccessfully());
            bldr.setSystemExceptionsTotal((long)this.stats.getTotalSysExceptions());
            bldr.setUserExceptionsTotal((long)this.stats.getTotalUserExceptions());
            bldr.setReceivedTotal((long)this.stats.getTotalRecordsReceived());
            bldr.setAvgProcessLatency(this.stats.getAvgProcessLatency());
            bldr.setLastInvocation((long)this.stats.getLastInvocation());
            bldr.setProcessedSuccessfullyTotal1Min((long)this.stats.getTotalProcessedSuccessfully1min());
            bldr.setSystemExceptionsTotal1Min((long)this.stats.getTotalSysExceptions1min());
            bldr.setUserExceptionsTotal1Min((long)this.stats.getTotalUserExceptions1min());
            bldr.setReceivedTotal1Min((long)this.stats.getTotalRecordsReceived1min());
            bldr.setAvgProcessLatency1Min(this.stats.getAvgProcessLatency1min());
        }
        return bldr;
    }

    public synchronized InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
        InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
        if (this.stats != null) {
            functionStatusBuilder.setNumReceived((long)this.stats.getTotalRecordsReceived());
            functionStatusBuilder.setNumSuccessfullyProcessed((long)this.stats.getTotalProcessedSuccessfully());
            functionStatusBuilder.setNumUserExceptions((long)this.stats.getTotalUserExceptions());
            this.stats.getLatestUserExceptions().forEach(ex -> functionStatusBuilder.addLatestUserExceptions((InstanceCommunication.FunctionStatus.ExceptionInformation)ex));
            functionStatusBuilder.setNumSystemExceptions((long)this.stats.getTotalSysExceptions());
            this.stats.getLatestSystemExceptions().forEach(ex -> functionStatusBuilder.addLatestSystemExceptions((InstanceCommunication.FunctionStatus.ExceptionInformation)ex));
            this.stats.getLatestSourceExceptions().forEach(ex -> functionStatusBuilder.addLatestSourceExceptions((InstanceCommunication.FunctionStatus.ExceptionInformation)ex));
            this.stats.getLatestSinkExceptions().forEach(ex -> functionStatusBuilder.addLatestSinkExceptions((InstanceCommunication.FunctionStatus.ExceptionInformation)ex));
            functionStatusBuilder.setAverageLatency(this.stats.getAvgProcessLatency());
            functionStatusBuilder.setLastInvocationTime((long)this.stats.getLastInvocation());
        }
        return functionStatusBuilder;
    }

    private void setupLogHandler() {
        if (this.instanceConfig.getFunctionDetails().getLogTopic() != null && !this.instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
            this.logAppender = new LogAppender(this.client, this.instanceConfig.getFunctionDetails().getLogTopic(), FunctionCommon.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
            this.logAppender.start();
            this.setupLogTopicAppender(LoggerContext.getContext());
        }
    }

    private void addLogTopicHandler() {
        if (this.logAppender == null) {
            return;
        }
        this.setupLogTopicAppender(LoggerContext.getContext((boolean)false));
    }

    private void setupLogTopicAppender(LoggerContext context) {
        Configuration config = context.getConfiguration();
        config.addAppender((Appender)this.logAppender);
        for (LoggerConfig loggerConfig : config.getLoggers().values()) {
            loggerConfig.addAppender((Appender)this.logAppender, null, null);
        }
        config.getRootLogger().addAppender((Appender)this.logAppender, null, null);
        context.updateLoggers();
    }

    private void removeLogTopicHandler() {
        if (this.logAppender == null) {
            return;
        }
        this.removeLogTopicAppender(LoggerContext.getContext((boolean)false));
    }

    private void removeLogTopicAppender(LoggerContext context) {
        Configuration config = context.getConfiguration();
        for (LoggerConfig loggerConfig : config.getLoggers().values()) {
            loggerConfig.removeAppender(this.logAppender.getName());
        }
        config.getRootLogger().removeAppender(this.logAppender.getName());
        context.updateLoggers();
    }

    private void setupInput(ContextImpl contextImpl) throws Exception {
        Object object;
        Function.SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource();
        if (sourceSpec.getClassName().isEmpty()) {
            PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
            sourceSpec.getInputSpecsMap().forEach((topic, conf) -> {
                ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(conf.getIsRegexPattern()).build();
                if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
                    consumerConfig.setSchemaType(conf.getSchemaType());
                } else if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
                    consumerConfig.setSerdeClassName(conf.getSerdeClassName());
                }
                consumerConfig.setSchemaProperties(conf.getSchemaPropertiesMap());
                consumerConfig.setConsumerProperties(conf.getConsumerPropertiesMap());
                if (conf.hasReceiverQueueSize()) {
                    consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue());
                }
                if (conf.hasCryptoSpec()) {
                    consumerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
                }
                pulsarSourceConfig.getTopicSchema().put((String)topic, consumerConfig);
            });
            sourceSpec.getTopicsToSerDeClassNameMap().forEach((topic, serde) -> pulsarSourceConfig.getTopicSchema().put((String)topic, ConsumerConfig.builder().serdeClassName((String)serde).isRegexPattern(false).build()));
            if (!StringUtils.isEmpty(sourceSpec.getTopicsPattern())) {
                pulsarSourceConfig.getTopicSchema().get(sourceSpec.getTopicsPattern()).setRegexPattern(true);
            }
            pulsarSourceConfig.setSubscriptionName(StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName() : InstanceUtils.getDefaultSubscriptionName(this.instanceConfig.getFunctionDetails()));
            pulsarSourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
            switch (sourceSpec.getSubscriptionPosition()) {
                case EARLIEST: {
                    pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
                    break;
                }
                default: {
                    pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
                }
            }
            switch (sourceSpec.getSubscriptionType()) {
                case FAILOVER: {
                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
                    break;
                }
                case KEY_SHARED: {
                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Key_Shared);
                    break;
                }
                default: {
                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared);
                }
            }
            pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
            if (sourceSpec.getTimeoutMs() > 0L) {
                pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
            }
            if (sourceSpec.getNegativeAckRedeliveryDelayMs() > 0L) {
                pulsarSourceConfig.setNegativeAckRedeliveryDelayMs(sourceSpec.getNegativeAckRedeliveryDelayMs());
            }
            if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
                pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
                pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
            }
            object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
        } else {
            object = sourceSpec.getClassName().equals(BatchSourceExecutor.class.getName()) ? Reflections.createInstance(sourceSpec.getClassName(), this.instanceClassLoader) : Reflections.createInstance(sourceSpec.getClassName(), this.functionClassLoader);
        }
        if (object instanceof Source) {
            Class[] typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass());
            assert (typeArgs.length > 0);
        } else {
            throw new RuntimeException("Source does not implement correct interface");
        }
        this.source = (Source)object;
        if (!(this.source instanceof PulsarSource)) {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
        }
        try {
            if (sourceSpec.getConfigs().isEmpty()) {
                this.source.open(new HashMap<String, Object>(), contextImpl);
            } else {
                this.source.open((Map)new Gson().fromJson(sourceSpec.getConfigs(), new TypeToken<Map<String, Object>>(){}.getType()), contextImpl);
            }
        }
        catch (Exception e) {
            log.error("Source open produced uncaught exception: ", (Throwable)e);
            throw e;
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
    }

    private void setupOutput(ContextImpl contextImpl) throws Exception {
        PulsarSink object;
        Function.SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
        if (sinkSpec.getClassName().isEmpty()) {
            if (StringUtils.isEmpty(sinkSpec.getTopic())) {
                object = PulsarSinkDisable.INSTANCE;
            } else {
                PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
                pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
                pulsarSinkConfig.setTopic(sinkSpec.getTopic());
                pulsarSinkConfig.setForwardSourceMessageProperty(this.instanceConfig.getFunctionDetails().getSink().getForwardSourceMessageProperty());
                if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
                    pulsarSinkConfig.setSchemaType(sinkSpec.getSchemaType());
                } else if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
                    pulsarSinkConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
                }
                pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
                pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap());
                if (this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) {
                    Function.ProducerSpec conf = this.instanceConfig.getFunctionDetails().getSink().getProducerSpec();
                    ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder().maxPendingMessages(conf.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions()).useThreadLocalProducers(conf.getUseThreadLocalProducers()).cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
                    pulsarSinkConfig.setProducerConfig(builder.build());
                }
                object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
            }
        } else {
            object = Reflections.createInstance(sinkSpec.getClassName(), this.functionClassLoader);
        }
        if (!(object instanceof Sink)) {
            throw new RuntimeException("Sink does not implement correct interface");
        }
        this.sink = object;
        if (!(this.sink instanceof PulsarSink)) {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
        }
        try {
            if (sinkSpec.getConfigs().isEmpty()) {
                this.sink.open(new HashMap<String, Object>(), contextImpl);
            } else {
                this.sink.open((Map)new Gson().fromJson(sinkSpec.getConfigs(), new TypeToken<Map<String, Object>>(){}.getType()), contextImpl);
            }
        }
        catch (Exception e) {
            log.error("Sink open produced uncaught exception: ", (Throwable)e);
            throw e;
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
    }

    public Throwable getDeathException() {
        return this.deathException;
    }
}

