/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.utils;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Struct;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.arrow.serializers.RowDataArrowSerializer;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatelessPythonFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.RowType;

public class PassThroughPythonAggregateFunctionRunner
extends BeamTableStatelessPythonFunctionRunner {
    private static final IntSerializer windowBoundarySerializer = IntSerializer.INSTANCE;
    private final List<byte[]> buffer = new LinkedList<byte[]>();
    private final RowDataArrowSerializer arrowSerializer;
    private final boolean isBatchOverWindow;
    private transient ByteArrayInputStreamWithPos bais;
    private transient DataInputViewStreamWrapper baisWrapper;
    private transient ByteArrayOutputStreamWithPos baos;

    public PassThroughPythonAggregateFunctionRunner(String taskName, PythonEnvironmentManager environmentManager, RowType inputType, RowType outputType, String functionUrn, FlinkFnApi.UserDefinedFunctions userDefinedFunctions, String coderUrn, Map<String, String> jobOptions, FlinkMetricContainer flinkMetricContainer, boolean isBatchOverWindow) {
        super(taskName, environmentManager, inputType, outputType, functionUrn, userDefinedFunctions, coderUrn, jobOptions, flinkMetricContainer, null, 0.0, FlinkFnApi.CoderParam.OutputMode.SINGLE);
        this.isBatchOverWindow = isBatchOverWindow;
        this.arrowSerializer = new RowDataArrowSerializer(inputType, outputType);
    }

    public void open(PythonConfig config) throws Exception {
        super.open(config);
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper((InputStream)this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.arrowSerializer.open((InputStream)this.bais, (OutputStream)this.baos);
    }

    protected void startBundle() {
        super.startBundle();
        this.mainInputReceiver = input -> {
            byte[] data = (byte[])input.getValue();
            this.bais.setBuffer(data, 0, data.length);
            if (this.isBatchOverWindow) {
                int windowSize = windowBoundarySerializer.deserialize((DataInputView)this.baisWrapper);
                ArrayList<Integer> lowerBoundarys = new ArrayList<Integer>();
                for (int i = 0; i < windowSize; ++i) {
                    int windowLength = windowBoundarySerializer.deserialize((DataInputView)this.baisWrapper);
                    for (int j = 0; j < windowLength; ++j) {
                        if (j % 2 == 0) {
                            lowerBoundarys.add(windowBoundarySerializer.deserialize((DataInputView)this.baisWrapper));
                            continue;
                        }
                        windowBoundarySerializer.deserialize((DataInputView)this.baisWrapper);
                    }
                }
                this.arrowSerializer.load();
                for (Integer lowerBoundary : lowerBoundarys) {
                    RowData firstData = (RowData)this.arrowSerializer.read(lowerBoundary.intValue());
                    this.arrowSerializer.write((Object)firstData);
                }
                this.arrowSerializer.resetReader();
            } else {
                this.arrowSerializer.load();
                this.arrowSerializer.write(this.arrowSerializer.read(0));
                this.arrowSerializer.resetReader();
            }
            this.arrowSerializer.finishCurrentBatch();
            this.buffer.add(this.baos.toByteArray());
            this.baos.reset();
            this.arrowSerializer.resetWriter();
        };
    }

    public void flush() throws Exception {
        super.flush();
        this.resultBuffer.addAll(this.buffer);
        this.buffer.clear();
    }

    public JobBundleFactory createJobBundleFactory(Struct pipelineOptions) {
        return PythonTestUtils.createMockJobBundleFactory();
    }
}

