package org.apache.flink.table.runtime.utils;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.RowDataArrowSerializer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:org/apache/flink/table/runtime/utils/PassThroughPythonAggregateFunctionRunner.class */
public class PassThroughPythonAggregateFunctionRunner extends BeamTableStatelessPythonFunctionRunner {
    private static final IntSerializer windowBoundarySerializer = IntSerializer.INSTANCE;
    private final List<byte[]> buffer;
    private final RowDataArrowSerializer arrowSerializer;
    private final boolean isBatchOverWindow;
    private transient ByteArrayInputStreamWithPos bais;
    private transient DataInputViewStreamWrapper baisWrapper;
    private transient ByteArrayOutputStreamWithPos baos;

    public PassThroughPythonAggregateFunctionRunner(String str, PythonEnvironmentManager pythonEnvironmentManager, RowType rowType, RowType rowType2, String str2, FlinkFnApi.UserDefinedFunctions userDefinedFunctions, String str3, Map<String, String> map, FlinkMetricContainer flinkMetricContainer, boolean z) {
        super(str, pythonEnvironmentManager, rowType, rowType2, str2, userDefinedFunctions, str3, map, flinkMetricContainer, (MemoryManager) null, 0.0d);
        this.buffer = new LinkedList();
        this.isBatchOverWindow = z;
        this.arrowSerializer = new RowDataArrowSerializer(rowType, rowType2);
    }

    public void open(PythonConfig pythonConfig) throws Exception {
        super.open(pythonConfig);
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.arrowSerializer.open(this.bais, this.baos);
    }

    protected void startBundle() {
        super.startBundle();
        this.mainInputReceiver = windowedValue -> {
            byte[] bArr = (byte[]) windowedValue.getValue();
            this.bais.setBuffer(bArr, 0, bArr.length);
            if (this.isBatchOverWindow) {
                int intValue = windowBoundarySerializer.deserialize(this.baisWrapper).intValue();
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < intValue; i++) {
                    int intValue2 = windowBoundarySerializer.deserialize(this.baisWrapper).intValue();
                    for (int i2 = 0; i2 < intValue2; i2++) {
                        if (i2 % 2 == 0) {
                            arrayList.add(windowBoundarySerializer.deserialize(this.baisWrapper));
                        } else {
                            windowBoundarySerializer.deserialize(this.baisWrapper);
                        }
                    }
                }
                this.arrowSerializer.load();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    this.arrowSerializer.write((RowData) this.arrowSerializer.read(((Integer) it.next()).intValue()));
                }
                this.arrowSerializer.resetReader();
            } else {
                this.arrowSerializer.load();
                this.arrowSerializer.write(this.arrowSerializer.read(0));
                this.arrowSerializer.resetReader();
            }
            this.arrowSerializer.finishCurrentBatch();
            this.buffer.add(this.baos.toByteArray());
            this.baos.reset();
            this.arrowSerializer.resetWriter();
        };
    }

    public void flush() throws Exception {
        super.flush();
        this.resultBuffer.addAll(this.buffer);
        this.buffer.clear();
    }

    public JobBundleFactory createJobBundleFactory(Struct struct) {
        return PythonTestUtils.createMockJobBundleFactory();
    }
}
