package org.apache.flink.table.runtime.operators.hive.script;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.JobConfWrapper;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.script.ScriptTransformIOInfo;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.ql.exec.RecordReader;
import org.apache.hadoop.hive.ql.exec.RecordWriter;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.class */
public class HiveScriptTransformOperator extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
    private static final long TIMEOUT_IN_SECONDS = 10;
    private final int[] inputIndices;
    private final String script;
    private final ScriptTransformIOInfo scriptTransformIOInfo;
    private final LogicalType inputType;
    private final LogicalType outputType;
    private final RowData.FieldGetter[] fieldGetters;
    private final JobConfWrapper jobConfWrapper;
    private transient StreamRecordCollector<RowData> collector;
    private transient Process scriptProcess;
    private transient InputStream inputStream;
    private transient OutputStream outputStream;
    private transient AbstractSerDe outputSerDe;
    private transient RecordWriter recordWriter;
    private transient AbstractSerDe inputSerDe;
    private transient RecordReader recordReader;
    private transient HiveObjectConversion[] hiveObjectConversions;
    private transient StructObjectInspector structInputInspector;
    private transient HiveScriptTransformOutReadThread outReadThread;
    private transient CircularBuffer errorBuffer;
    private transient Thread errorReadThread;

    public HiveScriptTransformOperator(int[] iArr, String str, ScriptTransformIOInfo scriptTransformIOInfo, LogicalType logicalType, LogicalType logicalType2) {
        this.inputIndices = iArr;
        this.script = str;
        this.scriptTransformIOInfo = scriptTransformIOInfo;
        this.inputType = logicalType;
        List children = logicalType.getChildren();
        this.fieldGetters = (RowData.FieldGetter[]) IntStream.range(0, children.size()).mapToObj(i -> {
            return RowData.createFieldGetter((LogicalType) children.get(i), i);
        }).toArray(i2 -> {
            return new RowData.FieldGetter[i2];
        });
        this.outputType = logicalType2;
        this.jobConfWrapper = (JobConfWrapper) scriptTransformIOInfo.getSerializableConf();
    }

    public void open() throws Exception {
        super.open();
        this.collector = new StreamRecordCollector<>(this.output);
        initScriptProc();
        initScriptInputSerDe();
        initScriptInputInfo();
        initScriptInputWriter();
        initOutputSerDe();
        initOutputReader();
        initScriptOutPutReadThread();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.inputIndices.length; i++) {
            arrayList.add(this.hiveObjectConversions[i].toHiveObject(this.fieldGetters[this.inputIndices[i]].getFieldOrNull(rowData)));
        }
        this.recordWriter.write(this.inputSerDe.serialize(arrayList, this.structInputInspector));
    }

    public void endInput() throws Exception {
        closeScriptProc();
    }

    public void close() throws Exception {
        closeScriptProc();
        super.close();
    }

    private void initScriptProc() throws IOException {
        this.scriptProcess = new ScriptProcessBuilder(this.script, this.jobConfWrapper.conf(), getOperatorID()).build();
        this.inputStream = this.scriptProcess.getInputStream();
        this.outputStream = this.scriptProcess.getOutputStream();
        InputStream errorStream = this.scriptProcess.getErrorStream();
        this.errorBuffer = new CircularBuffer();
        this.errorReadThread = new RedirectStreamThread(errorStream, this.errorBuffer, String.format("Thread-%s-error-consumer", getClass().getSimpleName()));
        this.errorReadThread.start();
    }

    private void closeScriptProc() {
        if (this.recordWriter != null) {
            try {
                this.recordWriter.close();
            } catch (IOException e) {
                LOG.warn("Exception in closing RecordWriter.", (Throwable) e);
            }
            this.recordWriter = null;
        }
        if (this.scriptProcess != null) {
            try {
                if (!this.scriptProcess.waitFor(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)) {
                    LOG.warn("Transformation script process exits timeout in {} seconds", Long.valueOf(TIMEOUT_IN_SECONDS));
                }
            } catch (InterruptedException e2) {
                LOG.warn("Exception in waiting for transformation script process.", (Throwable) e2);
            }
            if (this.scriptProcess.isAlive()) {
                this.scriptProcess.destroy();
            } else {
                int exitValue = this.scriptProcess.exitValue();
                if (exitValue != 0) {
                    throw new FlinkHiveException(String.format("Script failed with code %d, message %s.", Integer.valueOf(exitValue), this.errorBuffer.toString()));
                }
            }
            this.scriptProcess = null;
        }
        if (this.outReadThread != null) {
            try {
                this.outReadThread.join();
            } catch (InterruptedException e3) {
                LOG.warn("Exception in closing output read thread.", (Throwable) e3);
            }
            this.outReadThread = null;
        }
        if (this.errorReadThread != null) {
            try {
                this.errorReadThread.join();
            } catch (InterruptedException e4) {
                LOG.warn("Exception in closing error read thread.", (Throwable) e4);
            }
            this.errorReadThread = null;
        }
    }

    private AbstractSerDe initSerDe(String str, Map<String, String> map) throws Exception {
        AbstractSerDe abstractSerDe = (AbstractSerDe) Class.forName(str).newInstance();
        Properties properties = new Properties();
        properties.putAll(map);
        abstractSerDe.initialize(((JobConfWrapper) this.scriptTransformIOInfo.getSerializableConf()).conf(), properties);
        return abstractSerDe;
    }

    private void initScriptInputSerDe() throws Exception {
        this.inputSerDe = initSerDe(this.scriptTransformIOInfo.getInputSerdeClass(), this.scriptTransformIOInfo.getInputSerdeProps());
    }

    private void initScriptInputInfo() {
        StructObjectInspector structObjectInspector = (StructObjectInspector) HiveInspectors.getObjectInspector(this.inputType);
        ObjectInspector[] objectInspectorArr = new ObjectInspector[this.inputIndices.length];
        ObjectInspector[] objectInspectorArr2 = (ObjectInspector[]) structObjectInspector.getAllStructFieldRefs().stream().map((v0) -> {
            return v0.getFieldObjectInspector();
        }).toArray(i -> {
            return new ObjectInspector[i];
        });
        this.hiveObjectConversions = new HiveObjectConversion[this.inputIndices.length];
        for (int i2 = 0; i2 < this.inputIndices.length; i2++) {
            int i3 = this.inputIndices[i2];
            objectInspectorArr[i2] = objectInspectorArr2[i3];
            this.hiveObjectConversions[i2] = HiveInspectors.getConversion(objectInspectorArr[i2], (LogicalType) this.inputType.getChildren().get(i3), HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()));
        }
        this.structInputInspector = ObjectInspectorFactory.getStandardStructObjectInspector(Arrays.asList(((String) this.scriptTransformIOInfo.getInputSerdeProps().get("columns")).split(",")), Arrays.asList(objectInspectorArr));
    }

    private void initScriptInputWriter() throws Exception {
        this.recordWriter = (RecordWriter) Class.forName(this.scriptTransformIOInfo.getRecordWriterClass()).newInstance();
        this.recordWriter.initialize(this.outputStream, ((JobConfWrapper) this.scriptTransformIOInfo.getSerializableConf()).conf());
    }

    private void initOutputSerDe() throws Exception {
        this.outputSerDe = initSerDe(this.scriptTransformIOInfo.getOutputSerdeClass(), this.scriptTransformIOInfo.getOutputSerdeProps());
    }

    private void initOutputReader() throws Exception {
        this.recordReader = (RecordReader) Class.forName(this.scriptTransformIOInfo.getRecordReaderClass()).newInstance();
        Properties properties = new Properties();
        properties.putAll(this.scriptTransformIOInfo.getOutputSerdeProps());
        this.recordReader.initialize(this.inputStream, ((JobConfWrapper) this.scriptTransformIOInfo.getSerializableConf()).conf(), properties);
    }

    private void initScriptOutPutReadThread() throws Exception {
        this.outReadThread = new HiveScriptTransformOutReadThread(this.recordReader, this.outputType, this.outputSerDe, (StructObjectInspector) this.outputSerDe.getObjectInspector(), this.collector);
        this.outReadThread.start();
    }
}
