package org.apache.linkis.engineplugin.repl.executor;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.linkis.common.io.resultset.ResultSetWriter;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.common.utils.OverloadUtils;
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask;
import org.apache.linkis.engineconn.computation.executor.execute.ConcurrentComputationExecutor;
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext;
import org.apache.linkis.engineconn.core.EngineConnObject;
import org.apache.linkis.engineplugin.repl.conf.ReplConfiguration;
import org.apache.linkis.engineplugin.repl.conf.ReplEngineConf;
import org.apache.linkis.engineplugin.repl.errorcode.ReplErrorCodeSummary;
import org.apache.linkis.engineplugin.repl.exception.ReplException;
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
import org.apache.linkis.manager.common.entity.resource.LoadResource;
import org.apache.linkis.manager.common.entity.resource.NodeResource;
import org.apache.linkis.manager.engineplugin.common.util.NodeResourceUtils;
import org.apache.linkis.manager.label.entity.Label;
import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel;
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.executer.ErrorExecuteResponse;
import org.apache.linkis.scheduler.executer.ExecuteResponse;
import org.apache.linkis.scheduler.executer.SuccessExecuteResponse;
import org.apache.linkis.storage.LineMetaData;
import org.apache.linkis.storage.LineRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import scala.Tuple2;

/* loaded from: input_file:org/apache/linkis/engineplugin/repl/executor/ReplEngineConnExecutor.class */
public class ReplEngineConnExecutor extends ConcurrentComputationExecutor {
    private static final Logger logger = LoggerFactory.getLogger(ReplEngineConnExecutor.class);
    private int id;
    private ReplAdapter replAdapter;
    private List<Label<?>> executorLabels;
    private Map<String, Thread> threadCache;
    private Map<String, String> configMap;

    public ReplEngineConnExecutor(int i, int i2) {
        super(i);
        this.executorLabels = new ArrayList(2);
        this.threadCache = new ConcurrentHashMap();
        this.configMap = new HashMap();
        this.id = i2;
    }

    public void init() {
        super.init();
    }

    public ExecuteResponse execute(EngineConnTask engineConnTask) {
        Optional findFirst = Arrays.stream(engineConnTask.getLables()).filter(label -> {
            return label instanceof UserCreatorLabel;
        }).findFirst();
        Optional findFirst2 = Arrays.stream(engineConnTask.getLables()).filter(label2 -> {
            return label2 instanceof EngineTypeLabel;
        }).findFirst();
        if (findFirst.isPresent() && findFirst2.isPresent()) {
            Map<? extends String, ? extends String> cacheMap = new ReplEngineConf().getCacheMap(new Tuple2((UserCreatorLabel) findFirst.get(), (EngineTypeLabel) findFirst2.get()));
            if (MapUtils.isNotEmpty(cacheMap)) {
                this.configMap.putAll(cacheMap);
            }
        }
        Map properties = engineConnTask.getProperties();
        if (MapUtils.isNotEmpty(properties)) {
            properties.entrySet().stream().filter(entry -> {
                return entry.getValue() != null;
            }).forEach(entry2 -> {
            });
        }
        this.replAdapter = ReplAdapterFactory.create((String) ReplConfiguration.REPL_TYPE.getValue(this.configMap));
        return super.execute(engineConnTask);
    }

    public ExecuteResponse executeLine(EngineExecutionContext engineExecutionContext, String str) {
        if (StringUtils.isBlank(str)) {
            throw new ReplException(ReplErrorCodeSummary.REPL_CODE_IS_NOT_BLANK.getErrorCode(), ReplErrorCodeSummary.REPL_CODE_IS_NOT_BLANK.getErrorDesc());
        }
        String trim = str.trim();
        logger.info("Repl engine begins to run code:\n {}", trim);
        String str2 = (String) engineExecutionContext.getJobId().get();
        initialStatusUpdates(str2, engineExecutionContext);
        String str3 = (String) ReplConfiguration.CLASSPATH_DIR.getValue(this.configMap);
        String str4 = (String) ReplConfiguration.METHOD_NAME.getValue(this.configMap);
        this.threadCache.put(str2, Thread.currentThread());
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024);
        PrintStream printStream = new PrintStream(byteArrayOutputStream);
        PrintStream printStream2 = System.out;
        System.setOut(printStream);
        try {
            this.replAdapter.executorCode(trim, str3, str4);
            String byteArrayOutputStream2 = byteArrayOutputStream.toString();
            System.setOut(printStream2);
            engineExecutionContext.appendStdout(byteArrayOutputStream2);
            ResultSetWriter createResultSetWriter = engineExecutionContext.createResultSetWriter("1");
            try {
                try {
                    createResultSetWriter.addMetaData(new LineMetaData());
                    createResultSetWriter.addRecord(new LineRecord(byteArrayOutputStream2));
                    IOUtils.closeQuietly(createResultSetWriter);
                } catch (Throwable th) {
                    IOUtils.closeQuietly(createResultSetWriter);
                    throw th;
                }
            } catch (IOException e) {
                logger.error("Failed to get the task result");
                IOUtils.closeQuietly(createResultSetWriter);
            }
            return new SuccessExecuteResponse();
        } catch (Exception e2) {
            String stackTrace = ExceptionUtils.getStackTrace(e2);
            logger.error("Repl engine execute failed : {}", stackTrace);
            engineExecutionContext.appendStdout(LogUtils.generateERROR(stackTrace));
            return new ErrorExecuteResponse(stackTrace, (Throwable) null);
        }
    }

    public ExecuteResponse executeCompletely(EngineExecutionContext engineExecutionContext, String str, String str2) {
        return null;
    }

    public float progress(String str) {
        return 0.0f;
    }

    public JobProgressInfo[] getProgressInfo(String str) {
        return new JobProgressInfo[0];
    }

    public void killTask(String str) {
        Thread remove = this.threadCache.remove(str);
        if (null != remove) {
            remove.interrupt();
        }
        super.killTask(str);
    }

    public List<Label<?>> getExecutorLabels() {
        return this.executorLabels;
    }

    public void setExecutorLabels(List<Label<?>> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        this.executorLabels.clear();
        this.executorLabels.addAll(list);
    }

    public boolean supportCallBackLogs() {
        return false;
    }

    public NodeResource requestExpectedResource(NodeResource nodeResource) {
        return null;
    }

    public NodeResource getCurrentNodeResource() {
        NodeResourceUtils.appendMemoryUnitIfMissing(EngineConnObject.getEngineCreationContext().getOptions());
        CommonNodeResource commonNodeResource = new CommonNodeResource();
        commonNodeResource.setUsedResource(new LoadResource(OverloadUtils.getProcessMaxMemory(), 1));
        return commonNodeResource;
    }

    public String getId() {
        return Sender.getThisServiceInstance().getInstance() + "_" + this.id;
    }

    public int getConcurrentLimit() {
        return ((Integer) ReplConfiguration.ENGINE_CONCURRENT_LIMIT.getValue()).intValue();
    }

    private void initialStatusUpdates(String str, EngineExecutionContext engineExecutionContext) {
        engineExecutionContext.pushProgress(progress(str), getProgressInfo(str));
    }

    public void killAll() {
        for (Thread thread : this.threadCache.values()) {
            if (thread != null) {
                thread.interrupt();
            }
        }
        this.threadCache.clear();
    }

    public void close() {
        killAll();
        super.close();
    }
}
