package org.apache.flink.table.runtime.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Struct;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.runners.python.scalar.AbstractGeneralPythonScalarFunctionRunner;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/runtime/utils/PassThroughPythonScalarFunctionRunner.class */
public abstract class PassThroughPythonScalarFunctionRunner<IN> extends AbstractGeneralPythonScalarFunctionRunner<IN> {
    private final JobBundleFactory jobBundleFactory;
    private final List<byte[]> bufferedInputs;

    public PassThroughPythonScalarFunctionRunner(String str, FnDataReceiver<byte[]> fnDataReceiver, PythonFunctionInfo[] pythonFunctionInfoArr, PythonEnvironmentManager pythonEnvironmentManager, RowType rowType, RowType rowType2, Map<String, String> map, FlinkMetricContainer flinkMetricContainer) {
        this(str, fnDataReceiver, pythonFunctionInfoArr, pythonEnvironmentManager, rowType, rowType2, map, PythonTestUtils.createMockJobBundleFactory(), flinkMetricContainer);
    }

    public PassThroughPythonScalarFunctionRunner(String str, FnDataReceiver<byte[]> fnDataReceiver, PythonFunctionInfo[] pythonFunctionInfoArr, PythonEnvironmentManager pythonEnvironmentManager, RowType rowType, RowType rowType2, Map<String, String> map, JobBundleFactory jobBundleFactory, FlinkMetricContainer flinkMetricContainer) {
        super(str, fnDataReceiver, pythonFunctionInfoArr, pythonEnvironmentManager, rowType, rowType2, map, flinkMetricContainer);
        this.jobBundleFactory = jobBundleFactory;
        this.bufferedInputs = new ArrayList();
    }

    public void startBundle() {
        super.startBundle();
        this.mainInputReceiver = windowedValue -> {
            this.bufferedInputs.add(windowedValue.getValue());
        };
    }

    public void finishBundle() throws Exception {
        super.finishBundle();
        Iterator<byte[]> it = this.bufferedInputs.iterator();
        while (it.hasNext()) {
            this.resultReceiver.accept(it.next());
        }
        this.bufferedInputs.clear();
    }

    public JobBundleFactory createJobBundleFactory(Struct struct) {
        return this.jobBundleFactory;
    }
}
