package org.apache.pulsar.functions.instance;

import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.Iterator;
import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.connect.core.Record;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.processors.MessageProcessor;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/instance/JavaInstanceRunnable.class */
public class JavaInstanceRunnable implements AutoCloseable, Runnable {
    private static final Logger log = LoggerFactory.getLogger(JavaInstanceRunnable.class);
    private ClassLoader fnClassLoader;
    private final InstanceConfig instanceConfig;
    private final FunctionCacheManager fnCache;
    private final String jarFile;
    private final PulsarClientImpl client;
    private LogAppender logAppender;
    private final String stateStorageServiceUrl;
    private StorageClient storageClient;
    private Table<ByteBuf, ByteBuf> stateTable;
    private JavaInstance javaInstance;
    private Exception deathException;
    private SerDe outputSerDe;
    private final MessageProcessor processor;
    private final FunctionStats stats = new FunctionStats();
    private Record currentRecord;

    public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager functionCacheManager, String str, PulsarClient pulsarClient, String str2) {
        this.instanceConfig = instanceConfig;
        this.fnCache = functionCacheManager;
        this.jarFile = str;
        this.client = (PulsarClientImpl) pulsarClient;
        this.stateStorageServiceUrl = str2;
        this.processor = MessageProcessor.create(this.client, instanceConfig.getFunctionDetails());
    }

    JavaInstance setupJavaInstance() throws Exception {
        ThreadContext.put("function", FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
        ThreadContext.put("instance", this.instanceConfig.getInstanceId());
        log.info("Starting Java Instance {}", this.instanceConfig.getFunctionDetails().getName());
        loadJars();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Object createInstance = Reflections.createInstance(this.instanceConfig.getFunctionDetails().getClassName(), contextClassLoader);
        if (!(createInstance instanceof Function) && !(createInstance instanceof java.util.function.Function)) {
            throw new RuntimeException("User class must either be Function or java.util.Function");
        }
        Class<?>[] resolveRawArguments = createInstance instanceof Function ? TypeResolver.resolveRawArguments(Function.class, (Class) ((Function) createInstance).getClass()) : TypeResolver.resolveRawArguments(java.util.function.Function.class, (Class) ((java.util.function.Function) createInstance).getClass());
        setupSerDe(resolveRawArguments, contextClassLoader);
        setupStateTable();
        this.processor.setupOutput(this.outputSerDe);
        this.processor.setupInput(resolveRawArguments[0]);
        setupLogHandler();
        return new JavaInstance(this.instanceConfig, createInstance, contextClassLoader, this.client, this.processor.getSource());
    }

    @Override // java.lang.Runnable
    public void run() {
        StateContextImpl stateContextImpl;
        try {
            try {
                this.javaInstance = setupJavaInstance();
                while (true) {
                    this.currentRecord = this.processor.recieveMessage();
                    this.processor.postReceiveMessage(this.currentRecord);
                    if (null != this.stateTable) {
                        stateContextImpl = new StateContextImpl(this.stateTable);
                        this.javaInstance.getContext().setStateContext(stateContextImpl);
                    } else {
                        stateContextImpl = null;
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    this.stats.incrementProcessed(currentTimeMillis);
                    addLogTopicHandler();
                    MessageId messageId = null;
                    String str = null;
                    if (this.currentRecord instanceof PulsarRecord) {
                        PulsarRecord pulsarRecord = (PulsarRecord) this.currentRecord;
                        messageId = pulsarRecord.getMessageId();
                        str = pulsarRecord.getTopicName();
                    }
                    JavaExecutionResult handleMessage = this.javaInstance.handleMessage(messageId, str, this.currentRecord.getValue());
                    removeLogTopicHandler();
                    long currentTimeMillis2 = System.currentTimeMillis();
                    log.debug("Got result: {}", handleMessage.getResult());
                    if (null != stateContextImpl) {
                        try {
                            stateContextImpl.flush().join();
                        } catch (Exception e) {
                            log.error("Failed to flush the state updates of message {}", this.currentRecord, e);
                            this.currentRecord.fail();
                        }
                    }
                    try {
                        processResult(this.currentRecord, handleMessage, currentTimeMillis, currentTimeMillis2);
                    } catch (Exception e2) {
                        log.warn("Failed to process result of message {}", this.currentRecord, e2);
                        this.currentRecord.fail();
                    }
                }
            } catch (Throwable th) {
                close();
                throw th;
            }
        } catch (Exception e3) {
            log.error("Uncaught exception in Java Instance", e3);
            this.deathException = e3;
            close();
        }
    }

    private void loadJars() throws Exception {
        log.info("Loading JAR files for function {} from jarFile {}", this.instanceConfig, this.jarFile);
        this.fnCache.registerFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceId(), Arrays.asList(this.jarFile), Collections.emptyList());
        log.info("Initialize function class loader for function {} at function cache manager", this.instanceConfig.getFunctionDetails().getName());
        this.fnClassLoader = this.fnCache.getClassLoader(this.instanceConfig.getFunctionId());
        if (null == this.fnClassLoader) {
            throw new Exception("No function class loader available.");
        }
        Thread.currentThread().setContextClassLoader(this.fnClassLoader);
    }

    private void setupStateTable() throws Exception {
        if (null == this.stateStorageServiceUrl) {
            return;
        }
        String replace = String.format("%s_%s", this.instanceConfig.getFunctionDetails().getTenant(), this.instanceConfig.getFunctionDetails().getNamespace()).replace('-', '_');
        String name = this.instanceConfig.getFunctionDetails().getName();
        StorageClientSettings build = StorageClientSettings.newBuilder().addEndpoints(NetUtils.parseEndpoint(this.stateStorageServiceUrl)).clientName("function-" + replace + "/" + name).build();
        StorageAdminClient buildAdmin = StorageClientBuilder.newBuilder().withSettings(build).buildAdmin();
        Throwable th = null;
        try {
            try {
                try {
                    FutureUtils.result(buildAdmin.getStream(replace, name));
                } finally {
                }
            } catch (NamespaceNotFoundException e) {
                FutureUtils.result(buildAdmin.createNamespace(replace, NamespaceConfiguration.newBuilder().setDefaultStreamConf(ProtocolConstants.DEFAULT_STREAM_CONF).build()));
                FutureUtils.result(buildAdmin.createStream(replace, name, ProtocolConstants.DEFAULT_STREAM_CONF));
            } catch (StreamNotFoundException e2) {
                FutureUtils.result(buildAdmin.createStream(replace, name, ProtocolConstants.DEFAULT_STREAM_CONF));
            }
            if (buildAdmin != null) {
                if (0 != 0) {
                    try {
                        buildAdmin.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    buildAdmin.close();
                }
            }
            log.info("Starting state table for function {}", this.instanceConfig.getFunctionDetails().getName());
            this.storageClient = StorageClientBuilder.newBuilder().withSettings(build).withNamespace(replace).build();
            this.stateTable = (Table) FutureUtils.result(this.storageClient.openTable(name));
        } catch (Throwable th3) {
            if (buildAdmin != null) {
                if (th != null) {
                    try {
                        buildAdmin.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    buildAdmin.close();
                }
            }
            throw th3;
        }
    }

    private void processResult(Record record, JavaExecutionResult javaExecutionResult, long j, long j2) throws Exception {
        if (javaExecutionResult.getUserException() != null) {
            log.info("Encountered user exception when processing message {}", record, javaExecutionResult.getUserException());
            this.stats.incrementUserExceptions(javaExecutionResult.getUserException());
            this.currentRecord.fail();
            return;
        }
        if (javaExecutionResult.getSystemException() != null) {
            log.info("Encountered system exception when processing message {}", record, javaExecutionResult.getSystemException());
            this.stats.incrementSystemExceptions(javaExecutionResult.getSystemException());
            throw javaExecutionResult.getSystemException();
        }
        this.stats.incrementSuccessfullyProcessed(j2 - j);
        if (javaExecutionResult.getResult() == null || this.instanceConfig.getFunctionDetails().getSink().getTopic() == null) {
            this.processor.sendOutputMessage(record, null);
            return;
        }
        try {
            byte[] serialize = this.outputSerDe.serialize(javaExecutionResult.getResult());
            if (serialize != null) {
                sendOutputMessage(record, serialize);
            } else {
                this.processor.sendOutputMessage(record, null);
            }
        } catch (Exception e) {
            this.stats.incrementSerializationExceptions();
            throw e;
        }
    }

    private void sendOutputMessage(Record record, byte[] bArr) throws Exception {
        MessageBuilder create = MessageBuilder.create();
        if (record instanceof PulsarRecord) {
            PulsarRecord pulsarRecord = (PulsarRecord) record;
            create.setContent(bArr).setProperty("__pfn_input_topic__", pulsarRecord.getTopicName()).setProperty("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
        }
        this.processor.sendOutputMessage(record, create);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.processor.close();
        if (null != this.javaInstance) {
            this.javaInstance.close();
        }
        if (null != this.stateTable) {
            this.stateTable.close();
            this.stateTable = null;
        }
        if (null != this.storageClient) {
            this.storageClient.close();
        }
        this.fnCache.unregisterFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceId());
        log.info("Unloading JAR files for function {}", this.instanceConfig);
    }

    public InstanceCommunication.MetricsData getAndResetMetrics() {
        InstanceCommunication.MetricsData andResetMetrics;
        InstanceCommunication.MetricsData.Builder newBuilder = InstanceCommunication.MetricsData.newBuilder();
        addSystemMetrics("__total_processed__", this.stats.getCurrentStats().getTotalProcessed(), newBuilder);
        addSystemMetrics("__total_successfully_processed__", this.stats.getCurrentStats().getTotalSuccessfullyProcessed(), newBuilder);
        addSystemMetrics("__total_system_exceptions__", this.stats.getCurrentStats().getTotalSystemExceptions(), newBuilder);
        addSystemMetrics("__total_user_exceptions__", this.stats.getCurrentStats().getTotalUserExceptions(), newBuilder);
        this.stats.getCurrentStats().getTotalDeserializationExceptions().forEach((str, l) -> {
            addSystemMetrics("__total_deserialization_exceptions__" + str, l.longValue(), newBuilder);
        });
        addSystemMetrics("__total_serialization_exceptions__", this.stats.getCurrentStats().getTotalSerializationExceptions(), newBuilder);
        addSystemMetrics("__avg_latency_ms__", this.stats.getCurrentStats().computeLatency(), newBuilder);
        this.stats.resetCurrent();
        if (this.javaInstance != null && (andResetMetrics = this.javaInstance.getAndResetMetrics()) != null) {
            newBuilder.putAllMetrics(andResetMetrics.getMetricsMap());
        }
        return newBuilder.build();
    }

    public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
        InstanceCommunication.FunctionStatus.Builder newBuilder = InstanceCommunication.FunctionStatus.newBuilder();
        newBuilder.setNumProcessed(this.stats.getTotalStats().getTotalProcessed());
        newBuilder.setNumSuccessfullyProcessed(this.stats.getTotalStats().getTotalSuccessfullyProcessed());
        newBuilder.setNumUserExceptions(this.stats.getTotalStats().getTotalUserExceptions());
        this.stats.getTotalStats().getLatestUserExceptions().forEach(exceptionInformation -> {
            newBuilder.addLatestUserExceptions(exceptionInformation);
        });
        newBuilder.setNumSystemExceptions(this.stats.getTotalStats().getTotalSystemExceptions());
        this.stats.getTotalStats().getLatestSystemExceptions().forEach(exceptionInformation2 -> {
            newBuilder.addLatestSystemExceptions(exceptionInformation2);
        });
        newBuilder.putAllDeserializationExceptions(this.stats.getTotalStats().getTotalDeserializationExceptions());
        newBuilder.setSerializationExceptions(this.stats.getTotalStats().getTotalSerializationExceptions());
        newBuilder.setAverageLatency(this.stats.getTotalStats().computeLatency());
        newBuilder.setLastInvocationTime(this.stats.getTotalStats().getLastInvocationTime());
        return newBuilder;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void addSystemMetrics(String str, double d, InstanceCommunication.MetricsData.Builder builder) {
        builder.putMetrics(str, InstanceCommunication.MetricsData.DataDigest.newBuilder().setCount(d).setSum(d).setMax(d).setMin(0.0d).build());
    }

    private void setupSerDe(Class<?>[] clsArr, ClassLoader classLoader) {
        if (Void.class.equals(clsArr[1])) {
            return;
        }
        if (this.instanceConfig.getFunctionDetails().getSink().getSerDeClassName() == null || this.instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty() || this.instanceConfig.getFunctionDetails().getSink().getSerDeClassName().equals(DefaultSerDe.class.getName())) {
            this.outputSerDe = InstanceUtils.initializeDefaultSerDe(clsArr[1]);
        } else {
            this.outputSerDe = InstanceUtils.initializeSerDe(this.instanceConfig.getFunctionDetails().getSink().getSerDeClassName(), classLoader, clsArr[1]);
        }
        Class<?>[] resolveRawArguments = TypeResolver.resolveRawArguments(SerDe.class, (Class) this.outputSerDe.getClass());
        if (this.outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
            if (!DefaultSerDe.IsSupportedType(clsArr[1])) {
                throw new RuntimeException("Default Serde does not support type " + clsArr[1]);
            }
        } else if (!resolveRawArguments[0].isAssignableFrom(clsArr[1])) {
            throw new RuntimeException("Inconsistent types found between function output type and output serde type:  function type = " + clsArr[1] + "should be assignable from " + resolveRawArguments[0]);
        }
    }

    private void setupLogHandler() {
        if (this.instanceConfig.getFunctionDetails().getLogTopic() == null || this.instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
            return;
        }
        this.logAppender = new LogAppender(this.client, this.instanceConfig.getFunctionDetails().getLogTopic(), FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
        this.logAppender.start();
    }

    private void addLogTopicHandler() {
        if (this.logAppender == null) {
            return;
        }
        Configuration configuration = LoggerContext.getContext(false).getConfiguration();
        configuration.addAppender(this.logAppender);
        Iterator it = configuration.getLoggers().values().iterator();
        while (it.hasNext()) {
            ((LoggerConfig) it.next()).addAppender(this.logAppender, (Level) null, (Filter) null);
        }
        configuration.getRootLogger().addAppender(this.logAppender, (Level) null, (Filter) null);
    }

    private void removeLogTopicHandler() {
        if (this.logAppender == null) {
            return;
        }
        Configuration configuration = LoggerContext.getContext(false).getConfiguration();
        Iterator it = configuration.getLoggers().values().iterator();
        while (it.hasNext()) {
            ((LoggerConfig) it.next()).removeAppender(this.logAppender.getName());
        }
        configuration.getRootLogger().removeAppender(this.logAppender.getName());
    }

    StorageClient getStorageClient() {
        return this.storageClient;
    }

    Table<ByteBuf, ByteBuf> getStateTable() {
        return this.stateTable;
    }

    public Exception getDeathException() {
        return this.deathException;
    }

    SerDe getOutputSerDe() {
        return this.outputSerDe;
    }

    MessageProcessor getProcessor() {
        return this.processor;
    }
}
