/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.instance;

import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.InternalServerException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.ContextImpl;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.JavaExecutionResult;
import org.apache.pulsar.functions.instance.JavaInstance;
import org.apache.pulsar.functions.instance.LogAppender;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaInstanceRunnable
implements AutoCloseable,
Runnable {
    private static final Logger log = LoggerFactory.getLogger(JavaInstanceRunnable.class);
    private ClassLoader fnClassLoader;
    private final InstanceConfig instanceConfig;
    private final FunctionCacheManager fnCache;
    private final String jarFile;
    private final PulsarClientImpl client;
    private LogAppender logAppender;
    private final String stateStorageServiceUrl;
    private StorageClient storageClient;
    private Table<ByteBuf, ByteBuf> stateTable;
    private JavaInstance javaInstance;
    private Throwable deathException;
    private ComponentStatsManager stats;
    private Record<?> currentRecord;
    private Source source;
    private Sink sink;
    private final SecretsProvider secretsProvider;
    private CollectorRegistry collectorRegistry;
    private final String[] metricsLabels;
    private InstanceCache instanceCache;
    private final Function.FunctionDetails.ComponentType componentType;
    private final Map<String, String> properties;

    public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, PulsarClient pulsarClient, String stateStorageServiceUrl, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) {
        this.instanceConfig = instanceConfig;
        this.fnCache = fnCache;
        this.jarFile = jarFile;
        this.client = (PulsarClientImpl)pulsarClient;
        this.stateStorageServiceUrl = stateStorageServiceUrl;
        this.secretsProvider = secretsProvider;
        this.collectorRegistry = collectorRegistry;
        this.metricsLabels = new String[]{instanceConfig.getFunctionDetails().getTenant(), String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace()), instanceConfig.getFunctionDetails().getName(), String.valueOf(instanceConfig.getInstanceId()), instanceConfig.getClusterName(), FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)instanceConfig.getFunctionDetails())};
        this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
        this.properties = InstanceUtils.getProperties(this.componentType, FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)instanceConfig.getFunctionDetails()), this.instanceConfig.getInstanceId());
        this.collectorRegistry = collectorRegistry;
    }

    JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
        ThreadContext.put((String)"function", (String)FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)this.instanceConfig.getFunctionDetails()));
        ThreadContext.put((String)"functionname", (String)this.instanceConfig.getFunctionDetails().getName());
        ThreadContext.put((String)"instance", (String)this.instanceConfig.getInstanceName());
        log.info("Starting Java Instance {} : \n Details = {}", (Object)this.instanceConfig.getFunctionDetails().getName(), (Object)this.instanceConfig.getFunctionDetails());
        this.loadJars();
        ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
        Object object = Reflections.createInstance((String)this.instanceConfig.getFunctionDetails().getClassName(), (ClassLoader)clsLoader);
        if (!(object instanceof org.apache.pulsar.functions.api.Function) && !(object instanceof Function)) {
            throw new RuntimeException("User class must either be Function or java.util.Function");
        }
        this.setupStateTable();
        this.setupOutput(contextImpl);
        this.setupInput(contextImpl);
        this.setupLogHandler();
        return new JavaInstance(contextImpl, object);
    }

    ContextImpl setupContext() {
        Logger instanceLog = LoggerFactory.getLogger((String)("function-" + this.instanceConfig.getFunctionDetails().getName()));
        return new ContextImpl(this.instanceConfig, instanceLog, (PulsarClient)this.client, this.secretsProvider, this.collectorRegistry, this.metricsLabels, this.componentType, this.stats);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            try {
                this.instanceCache = InstanceCache.getInstanceCache();
                if (this.collectorRegistry == null) {
                    this.collectorRegistry = new CollectorRegistry();
                }
                this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService(), this.componentType);
                ContextImpl contextImpl = this.setupContext();
                this.javaInstance = this.setupJavaInstance(contextImpl);
                if (null != this.stateTable) {
                    StateContextImpl stateContext = new StateContextImpl(this.stateTable);
                    this.javaInstance.getContext().setStateContext(stateContext);
                }
                while (true) {
                    this.currentRecord = this.readInput();
                    this.stats.incrTotalReceived();
                    if (this.instanceConfig.getFunctionDetails().getProcessingGuarantees() == Function.ProcessingGuarantees.ATMOST_ONCE && this.instanceConfig.getFunctionDetails().getAutoAck()) {
                        this.currentRecord.ack();
                    }
                    this.addLogTopicHandler();
                    this.stats.setLastInvocation(System.currentTimeMillis());
                    this.stats.processTimeStart();
                    JavaExecutionResult result = this.javaInstance.handleMessage(this.currentRecord, this.currentRecord.getValue());
                    this.stats.processTimeEnd();
                    this.removeLogTopicHandler();
                    if (log.isDebugEnabled()) {
                        log.debug("Got result: {}", result.getResult());
                    }
                    try {
                        this.processResult(this.currentRecord, result);
                    }
                    catch (Exception e) {
                        log.warn("Failed to process result of message {}", this.currentRecord, (Object)e);
                        this.currentRecord.fail();
                    }
                }
            }
            catch (Throwable t) {
                log.error("[{}] Uncaught exception in Java Instance", (Object)FunctionCommon.getFullyQualifiedInstanceId((String)this.instanceConfig.getFunctionDetails().getTenant(), (String)this.instanceConfig.getFunctionDetails().getNamespace(), (String)this.instanceConfig.getFunctionDetails().getName(), (int)this.instanceConfig.getInstanceId()), (Object)t);
                this.deathException = t;
                if (this.stats != null) {
                    this.stats.incrSysExceptions(t);
                }
                return;
            }
        }
        finally {
            log.info("Closing instance");
            this.close();
        }
    }

    private void loadJars() throws Exception {
        try {
            log.info("Load JAR: {}", (Object)this.jarFile);
            this.fnCache.registerFunctionInstanceWithArchive(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName(), this.jarFile);
        }
        catch (FileNotFoundException e) {
            this.fnCache.registerFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName(), Arrays.asList(this.jarFile), Collections.emptyList());
        }
        log.info("Initialize function class loader for function {} at function cache manager", (Object)this.instanceConfig.getFunctionDetails().getName());
        this.fnClassLoader = this.fnCache.getClassLoader(this.instanceConfig.getFunctionId());
        if (null == this.fnClassLoader) {
            throw new Exception("No function class loader available.");
        }
        Thread.currentThread().setContextClassLoader(this.fnClassLoader);
    }

    /*
     * Exception decompiling
     */
    private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [10[CATCHBLOCK]], but top level block is 3[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void setupStateTable() throws Exception {
        if (null == this.stateStorageServiceUrl) {
            return;
        }
        String tableNs = FunctionCommon.getStateNamespace((String)this.instanceConfig.getFunctionDetails().getTenant(), (String)this.instanceConfig.getFunctionDetails().getNamespace());
        String tableName = this.instanceConfig.getFunctionDetails().getName();
        StorageClientSettings settings = StorageClientSettings.newBuilder().serviceUri(this.stateStorageServiceUrl).clientName("function-" + tableNs + "/" + tableName).backoffPolicy((Backoff.Policy)Backoff.Jitter.of((Backoff.Jitter.Type)Backoff.Jitter.Type.EXPONENTIAL, (long)100L, (long)2000L, (long)60L)).build();
        this.createStateTable(tableNs, tableName, settings);
        log.info("Starting state table for function {}", (Object)this.instanceConfig.getFunctionDetails().getName());
        this.storageClient = StorageClientBuilder.newBuilder().withSettings(settings).withNamespace(tableNs).build();
        Stopwatch openSw = Stopwatch.createStarted();
        while (openSw.elapsed(TimeUnit.MINUTES) < 1L) {
            try {
                this.stateTable = (Table)FutureUtils.result((CompletableFuture)this.storageClient.openTable(tableName));
                break;
            }
            catch (InternalServerException ise) {
                log.warn("Encountered internal server on opening table '{}', re-attempt in 100 milliseconds : {}", (Object)tableName, (Object)ise.getMessage());
                TimeUnit.MILLISECONDS.sleep(100L);
            }
        }
    }

    private void processResult(Record srcRecord, JavaExecutionResult result) throws Exception {
        if (result.getUserException() != null) {
            log.info("Encountered user exception when processing message {}", (Object)srcRecord, (Object)result.getUserException());
            this.stats.incrUserExceptions(result.getUserException());
            srcRecord.fail();
        } else {
            if (result.getResult() != null) {
                this.sendOutputMessage(srcRecord, result.getResult());
            } else if (this.instanceConfig.getFunctionDetails().getAutoAck()) {
                srcRecord.ack();
            }
            this.stats.incrTotalProcessedSuccessfully();
        }
    }

    private void sendOutputMessage(Record srcRecord, Object output) {
        try {
            this.sink.write(new SinkRecord<Object>(srcRecord, output));
        }
        catch (Exception e) {
            log.info("Encountered exception in sink write: ", (Throwable)e);
            this.stats.incrSinkExceptions(e);
            throw new RuntimeException(e);
        }
    }

    private Record readInput() {
        Record record;
        try {
            record = this.source.read();
        }
        catch (Exception e) {
            this.stats.incrSourceExceptions(e);
            log.info("Encountered exception in source read: ", (Throwable)e);
            throw new RuntimeException(e);
        }
        if (record == null) {
            throw new IllegalArgumentException("The record returned by the source cannot be null");
        }
        if (record.getValue() == null) {
            throw new IllegalArgumentException("The value in the record returned by the source cannot be null");
        }
        return record;
    }

    @Override
    public synchronized void close() {
        if (this.stats != null) {
            this.stats.close();
            this.stats = null;
        }
        if (this.source != null) {
            try {
                this.source.close();
            }
            catch (Throwable e) {
                log.error("Failed to close source {}", (Object)this.instanceConfig.getFunctionDetails().getSource().getClassName(), (Object)e);
            }
            this.source = null;
        }
        if (this.sink != null) {
            try {
                this.sink.close();
            }
            catch (Throwable e) {
                log.error("Failed to close sink {}", (Object)this.instanceConfig.getFunctionDetails().getSource().getClassName(), (Object)e);
            }
            this.sink = null;
        }
        if (null != this.javaInstance) {
            this.javaInstance.close();
            this.javaInstance = null;
        }
        if (null != this.stateTable) {
            this.stateTable.close();
            this.stateTable = null;
        }
        if (null != this.storageClient) {
            this.storageClient.closeAsync().exceptionally(cause -> {
                log.warn("Failed to close state storage client", cause);
                return null;
            });
            this.storageClient = null;
        }
        if (this.instanceCache != null) {
            this.fnCache.unregisterFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName());
            log.info("Unloading JAR files for function {}", (Object)this.instanceConfig);
            this.instanceCache = null;
        }
    }

    public InstanceCommunication.MetricsData getAndResetMetrics() {
        InstanceCommunication.MetricsData metricsData = this.getMetrics();
        this.stats.reset();
        return metricsData;
    }

    public InstanceCommunication.MetricsData getMetrics() {
        Map<String, Double> userMetrics;
        InstanceCommunication.MetricsData.Builder bldr = this.createMetricsDataBuilder();
        if (this.javaInstance != null && (userMetrics = this.javaInstance.getMetrics()) != null) {
            bldr.putAllUserMetrics(userMetrics);
        }
        return bldr.build();
    }

    public void resetMetrics() {
        this.stats.reset();
        this.javaInstance.resetMetrics();
    }

    private InstanceCommunication.MetricsData.Builder createMetricsDataBuilder() {
        InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
        if (this.stats != null) {
            bldr.setProcessedSuccessfullyTotal((long)this.stats.getTotalProcessedSuccessfully());
            bldr.setSystemExceptionsTotal((long)this.stats.getTotalSysExceptions());
            bldr.setUserExceptionsTotal((long)this.stats.getTotalUserExceptions());
            bldr.setReceivedTotal((long)this.stats.getTotalRecordsReceived());
            bldr.setAvgProcessLatency(this.stats.getAvgProcessLatency());
            bldr.setLastInvocation((long)this.stats.getLastInvocation());
            bldr.setProcessedSuccessfullyTotal1Min((long)this.stats.getTotalProcessedSuccessfully1min());
            bldr.setSystemExceptionsTotal1Min((long)this.stats.getTotalSysExceptions1min());
            bldr.setUserExceptionsTotal1Min((long)this.stats.getTotalUserExceptions1min());
            bldr.setReceivedTotal1Min((long)this.stats.getTotalRecordsReceived1min());
            bldr.setAvgProcessLatency1Min(this.stats.getAvgProcessLatency1min());
        }
        return bldr;
    }

    public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
        InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
        if (this.stats != null) {
            functionStatusBuilder.setNumReceived((long)this.stats.getTotalRecordsReceived());
            functionStatusBuilder.setNumSuccessfullyProcessed((long)this.stats.getTotalProcessedSuccessfully());
            functionStatusBuilder.setNumUserExceptions((long)this.stats.getTotalUserExceptions());
            this.stats.getLatestUserExceptions().forEach(ex -> functionStatusBuilder.addLatestUserExceptions(ex));
            functionStatusBuilder.setNumSystemExceptions((long)this.stats.getTotalSysExceptions());
            this.stats.getLatestSystemExceptions().forEach(ex -> functionStatusBuilder.addLatestSystemExceptions(ex));
            this.stats.getLatestSourceExceptions().forEach(ex -> functionStatusBuilder.addLatestSourceExceptions(ex));
            this.stats.getLatestSinkExceptions().forEach(ex -> functionStatusBuilder.addLatestSinkExceptions(ex));
            functionStatusBuilder.setAverageLatency(this.stats.getAvgProcessLatency());
            functionStatusBuilder.setLastInvocationTime((long)this.stats.getLastInvocation());
        }
        return functionStatusBuilder;
    }

    private void setupLogHandler() {
        if (this.instanceConfig.getFunctionDetails().getLogTopic() != null && !this.instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
            this.logAppender = new LogAppender((PulsarClient)this.client, this.instanceConfig.getFunctionDetails().getLogTopic(), FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)this.instanceConfig.getFunctionDetails()));
            this.logAppender.start();
        }
    }

    private void addLogTopicHandler() {
        if (this.logAppender == null) {
            return;
        }
        LoggerContext context = LoggerContext.getContext((boolean)false);
        Configuration config = context.getConfiguration();
        config.addAppender((Appender)this.logAppender);
        for (LoggerConfig loggerConfig : config.getLoggers().values()) {
            loggerConfig.addAppender((Appender)this.logAppender, null, null);
        }
        config.getRootLogger().addAppender((Appender)this.logAppender, null, null);
    }

    private void removeLogTopicHandler() {
        if (this.logAppender == null) {
            return;
        }
        LoggerContext context = LoggerContext.getContext((boolean)false);
        Configuration config = context.getConfiguration();
        for (LoggerConfig loggerConfig : config.getLoggers().values()) {
            loggerConfig.removeAppender(this.logAppender.getName());
        }
        config.getRootLogger().removeAppender(this.logAppender.getName());
    }

    public void setupInput(ContextImpl contextImpl) throws Exception {
        Object object;
        Function.SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource();
        if (sourceSpec.getClassName().isEmpty()) {
            PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
            sourceSpec.getInputSpecsMap().forEach((topic, conf) -> {
                ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(conf.getIsRegexPattern()).build();
                if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
                    consumerConfig.setSchemaType(conf.getSchemaType());
                } else if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
                    consumerConfig.setSerdeClassName(conf.getSerdeClassName());
                }
                if (conf.hasReceiverQueueSize()) {
                    consumerConfig.setReceiverQueueSize(Integer.valueOf(conf.getReceiverQueueSize().getValue()));
                }
                pulsarSourceConfig.getTopicSchema().put((String)topic, consumerConfig);
            });
            sourceSpec.getTopicsToSerDeClassNameMap().forEach((topic, serde) -> pulsarSourceConfig.getTopicSchema().put((String)topic, ConsumerConfig.builder().serdeClassName(serde).isRegexPattern(false).build()));
            if (!StringUtils.isEmpty((CharSequence)sourceSpec.getTopicsPattern())) {
                pulsarSourceConfig.getTopicSchema().get(sourceSpec.getTopicsPattern()).setRegexPattern(true);
            }
            pulsarSourceConfig.setSubscriptionName(StringUtils.isNotBlank((CharSequence)sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName() : InstanceUtils.getDefaultSubscriptionName(this.instanceConfig.getFunctionDetails()));
            pulsarSourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf((String)this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
            switch (sourceSpec.getSubscriptionType()) {
                case FAILOVER: {
                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
                    break;
                }
                default: {
                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared);
                }
            }
            pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
            if (sourceSpec.getTimeoutMs() > 0L) {
                pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
            }
            if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
                pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
                pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
            }
            object = new PulsarSource((PulsarClient)this.client, pulsarSourceConfig, this.properties);
        } else {
            object = Reflections.createInstance((String)sourceSpec.getClassName(), (ClassLoader)Thread.currentThread().getContextClassLoader());
        }
        if (object instanceof Source) {
            Class[] typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass());
            assert (typeArgs.length > 0);
        } else {
            throw new RuntimeException("Source does not implement correct interface");
        }
        this.source = (Source)object;
        if (sourceSpec.getConfigs().isEmpty()) {
            this.source.open(new HashMap(), (SourceContext)contextImpl);
        } else {
            this.source.open((Map)new Gson().fromJson(sourceSpec.getConfigs(), new TypeToken<Map<String, Object>>(){}.getType()), (SourceContext)contextImpl);
        }
    }

    public void setupOutput(ContextImpl contextImpl) throws Exception {
        PulsarSink object;
        Function.SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
        if (sinkSpec.getClassName().isEmpty()) {
            if (StringUtils.isEmpty((CharSequence)sinkSpec.getTopic())) {
                object = PulsarSinkDisable.INSTANCE;
            } else {
                PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
                pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf((String)this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
                pulsarSinkConfig.setTopic(sinkSpec.getTopic());
                if (!StringUtils.isEmpty((CharSequence)sinkSpec.getSchemaType())) {
                    pulsarSinkConfig.setSchemaType(sinkSpec.getSchemaType());
                } else if (!StringUtils.isEmpty((CharSequence)sinkSpec.getSerDeClassName())) {
                    pulsarSinkConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
                }
                pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
                object = new PulsarSink((PulsarClient)this.client, pulsarSinkConfig, this.properties, this.stats);
            }
        } else {
            object = Reflections.createInstance((String)sinkSpec.getClassName(), (ClassLoader)Thread.currentThread().getContextClassLoader());
        }
        if (!(object instanceof Sink)) {
            throw new RuntimeException("Sink does not implement correct interface");
        }
        this.sink = object;
        if (sinkSpec.getConfigs().isEmpty()) {
            this.sink.open(new HashMap(), (SinkContext)contextImpl);
        } else {
            this.sink.open((Map)new Gson().fromJson(sinkSpec.getConfigs(), new TypeToken<Map<String, Object>>(){}.getType()), (SinkContext)contextImpl);
        }
    }

    StorageClient getStorageClient() {
        return this.storageClient;
    }

    Table<ByteBuf, ByteBuf> getStateTable() {
        return this.stateTable;
    }

    public Throwable getDeathException() {
        return this.deathException;
    }

    public ComponentStatsManager getStats() {
        return this.stats;
    }
}

