package org.apache.asterix.external.library;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
import org.apache.asterix.external.ipc.PythonIPCProto;
import org.apache.asterix.external.library.msgpack.MessagePackUtils;
import org.apache.asterix.om.functions.IExternalFunctionInfo;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
import org.apache.hyracks.ipc.impl.IPCSystem;

/* loaded from: input_file:org/apache/asterix/external/library/PythonLibraryEvaluator.class */
public class PythonLibraryEvaluator extends AbstractStateObject implements IDeallocatable {
    public static final String ENTRYPOINT = "entrypoint.py";
    public static final String SITE_PACKAGES = "site-packages";
    private Process p;
    private ILibraryManager libMgr;
    private File pythonHome;
    private PythonIPCProto proto;
    private ExternalFunctionResultRouter router;
    private IPCSystem ipcSys;
    private String sitePkgs;
    private List<String> pythonArgs;
    private Map<String, String> pythonEnv;
    private TaskAttemptId task;
    private IWarningCollector warningCollector;
    private SourceLocation sourceLoc;

    public PythonLibraryEvaluator(JobId jobId, PythonLibraryEvaluatorId pythonLibraryEvaluatorId, ILibraryManager iLibraryManager, File file, String str, List<String> list, Map<String, String> map, ExternalFunctionResultRouter externalFunctionResultRouter, IPCSystem iPCSystem, TaskAttemptId taskAttemptId, IWarningCollector iWarningCollector, SourceLocation sourceLocation) {
        super(jobId, pythonLibraryEvaluatorId);
        this.libMgr = iLibraryManager;
        this.pythonHome = file;
        this.sitePkgs = str;
        this.pythonArgs = list;
        this.pythonEnv = map;
        this.router = externalFunctionResultRouter;
        this.task = taskAttemptId;
        this.ipcSys = iPCSystem;
        this.warningCollector = iWarningCollector;
        this.sourceLoc = sourceLocation;
    }

    private void initialize() throws IOException, AsterixException {
        PythonLibraryEvaluatorId pythonLibraryEvaluatorId = (PythonLibraryEvaluatorId) this.id;
        String absolutePath = ((PythonLibrary) this.libMgr.getLibrary(pythonLibraryEvaluatorId.getLibraryDataverseName(), pythonLibraryEvaluatorId.getLibraryName())).getFile().getAbsolutePath();
        int port = this.ipcSys.getSocketAddress().getPort();
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.pythonHome.getAbsolutePath());
        arrayList.addAll(this.pythonArgs);
        arrayList.add(ENTRYPOINT);
        arrayList.add(InetAddress.getLoopbackAddress().getHostAddress());
        arrayList.add(Integer.toString(port));
        arrayList.add(this.sitePkgs);
        ProcessBuilder processBuilder = new ProcessBuilder((String[]) arrayList.toArray(new String[0]));
        processBuilder.environment().putAll(this.pythonEnv);
        processBuilder.directory(new File(absolutePath));
        this.p = processBuilder.start();
        this.proto = new PythonIPCProto(this.p.getOutputStream(), this.router, this.p);
        this.proto.start();
        this.proto.helo();
    }

    public long initialize(IExternalFunctionInfo iExternalFunctionInfo) throws IOException, AsterixException {
        String str;
        String str2;
        List externalIdentifier = iExternalFunctionInfo.getExternalIdentifier();
        String str3 = (String) externalIdentifier.get(0);
        String str4 = (String) externalIdentifier.get(1);
        int lastIndexOf = str4.lastIndexOf(46);
        if (lastIndexOf >= 0) {
            str = str4.substring(0, lastIndexOf);
            str2 = str4.substring(lastIndexOf + 1);
        } else {
            str = null;
            str2 = str4;
        }
        return this.proto.init(str3, str, str2);
    }

    public ByteBuffer callPython(long j, IAType[] iATypeArr, IValueReference[] iValueReferenceArr, boolean z) throws IOException {
        ByteBuffer byteBuffer = null;
        try {
            byteBuffer = this.proto.call(j, iATypeArr, iValueReferenceArr, z);
        } catch (AsterixException e) {
            if (this.warningCollector.shouldWarn()) {
                this.warningCollector.warn(Warning.of(this.sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION, new Serializable[]{e.getMessage()}));
            }
        }
        return byteBuffer;
    }

    public ByteBuffer callPythonMulti(long j, ArrayBackedValueStorage arrayBackedValueStorage, int i) throws IOException {
        ByteBuffer byteBuffer = null;
        try {
            byteBuffer = this.proto.callMulti(j, arrayBackedValueStorage, i);
        } catch (AsterixException e) {
            if (this.warningCollector.shouldWarn()) {
                this.warningCollector.warn(Warning.of(this.sourceLoc, ErrorCode.EXTERNAL_UDF_EXCEPTION, new Serializable[]{e.getMessage()}));
            }
        }
        return byteBuffer;
    }

    public void deallocate() {
        if (this.p != null) {
            boolean z = false;
            try {
                this.p.destroy();
                z = this.p.waitFor(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            if (!z) {
                this.p.destroyForcibly();
            }
        }
        this.router.removeRoute(Long.valueOf(this.proto.getRouteId()));
    }

    public static ATypeTag peekArgument(IAType iAType, IValueReference iValueReference) throws HyracksDataException {
        if (iAType.getTypeTag() != ATypeTag.ANY) {
            return MessagePackUtils.peekUnknown(iAType);
        }
        TaggedValuePointable createPointable = TaggedValuePointable.FACTORY.createPointable();
        createPointable.set(iValueReference);
        return MessagePackUtils.peekUnknown(TypeTagUtil.getBuiltinTypeByTag(EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(createPointable.getTag())));
    }

    public static void setVoidArgument(ArrayBackedValueStorage arrayBackedValueStorage) throws IOException {
        arrayBackedValueStorage.getDataOutput().writeByte(-36);
        arrayBackedValueStorage.getDataOutput().writeShort(0);
    }

    public static PythonLibraryEvaluator getInstance(IExternalFunctionInfo iExternalFunctionInfo, ILibraryManager iLibraryManager, ExternalFunctionResultRouter externalFunctionResultRouter, IPCSystem iPCSystem, File file, IHyracksTaskContext iHyracksTaskContext, String str, List<String> list, Map<String, String> map, IWarningCollector iWarningCollector, SourceLocation sourceLocation) throws IOException, AsterixException {
        PythonLibraryEvaluatorId pythonLibraryEvaluatorId = new PythonLibraryEvaluatorId(iExternalFunctionInfo.getLibraryDataverseName(), iExternalFunctionInfo.getLibraryName(), Thread.currentThread());
        PythonLibraryEvaluator stateObject = iHyracksTaskContext.getStateObject(pythonLibraryEvaluatorId);
        if (stateObject == null) {
            stateObject = new PythonLibraryEvaluator(iHyracksTaskContext.getJobletContext().getJobId(), pythonLibraryEvaluatorId, iLibraryManager, file, str, list, map, externalFunctionResultRouter, iPCSystem, iHyracksTaskContext.getTaskAttemptId(), iWarningCollector, sourceLocation);
            iHyracksTaskContext.getJobletContext().registerDeallocatable(stateObject);
            stateObject.initialize();
            iHyracksTaskContext.setStateObject(stateObject);
        }
        return stateObject;
    }
}
