package org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonOverWindowAggregateFunctionOperator.class */
public class BatchArrowPythonOverWindowAggregateFunctionOperator extends AbstractBatchArrowPythonAggregateFunctionOperator {
    private static final long serialVersionUID = 1;
    private static final String PANDAS_BATCH_OVER_WINDOW_AGG_FUNCTION_URN = "flink:transform:batch_over_window_aggregate_function:arrow:v1";
    private static final IntSerializer windowBoundarySerializer = IntSerializer.INSTANCE;
    private final long[] lowerBoundary;
    private final long[] upperBoundary;
    private final boolean[] isRangeWindows;
    private final int[] aggWindowIndex;
    private final int inputTimeFieldIndex;
    private final boolean asc;
    private transient RowDataSerializer forwardedInputSerializer;
    private transient int lastKeyDataStartPos;
    private transient ByteArrayOutputStreamWithPos windowBoundaryWithDataBaos;
    private transient DataOutputViewStreamWrapper windowBoundaryWithDataWrapper;
    private transient List<List<Integer>> boundedRangeWindowBoundaries;
    private transient ArrayList<Integer> boundedRangeWindowIndex;

    public BatchArrowPythonOverWindowAggregateFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, long[] jArr, long[] jArr2, boolean[] zArr, int[] iArr, int[] iArr2, int[] iArr3, int[] iArr4, int i, boolean z) {
        super(configuration, pythonFunctionInfoArr, rowType, rowType2, iArr2, iArr3, iArr4);
        this.lowerBoundary = jArr;
        this.upperBoundary = jArr2;
        this.isRangeWindows = zArr;
        this.aggWindowIndex = iArr;
        this.inputTimeFieldIndex = i;
        this.asc = z;
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.forwardedInputSerializer = new RowDataSerializer(this.inputType);
        this.lastKeyDataStartPos = 0;
        this.windowBoundaryWithDataBaos = new ByteArrayOutputStreamWithPos();
        this.windowBoundaryWithDataWrapper = new DataOutputViewStreamWrapper(this.windowBoundaryWithDataBaos);
        this.boundedRangeWindowBoundaries = new ArrayList(this.lowerBoundary.length);
        this.boundedRangeWindowIndex = new ArrayList<>();
        for (int i = 0; i < this.lowerBoundary.length; i++) {
            if (this.isRangeWindows[i] && (this.lowerBoundary[i] != Long.MIN_VALUE || this.upperBoundary[i] != Long.MAX_VALUE)) {
                this.boundedRangeWindowIndex.add(Integer.valueOf(i));
                this.boundedRangeWindowBoundaries.add(new ArrayList());
            }
        }
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void bufferInput(RowData rowData) throws Exception {
        BinaryRowData copy = this.groupKeyProjection.apply(rowData).copy();
        if (isNewKey(copy)) {
            if (this.lastGroupKey != null) {
                invokeCurrentBatch();
            }
            this.lastGroupKey = copy;
            this.lastGroupSet = this.groupSetProjection.apply(rowData).copy();
        }
        this.forwardedInputQueue.add(this.forwardedInputSerializer.copy(rowData));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator
    protected void invokeCurrentBatch() throws Exception {
        if (this.currentBatchCount > 0) {
            this.arrowSerializer.finishCurrentBatch();
            ListIterator listIterator = this.forwardedInputQueue.listIterator(this.lastKeyDataStartPos);
            int[] iArr = new int[this.boundedRangeWindowIndex.size()];
            int[] iArr2 = new int[this.boundedRangeWindowIndex.size()];
            for (int i = 0; i < iArr.length; i++) {
                iArr[i] = this.lastKeyDataStartPos;
                iArr2[i] = this.lastKeyDataStartPos;
            }
            while (listIterator.hasNext()) {
                RowData rowData = (RowData) listIterator.next();
                for (int i2 = 0; i2 < this.boundedRangeWindowIndex.size(); i2++) {
                    int intValue = this.boundedRangeWindowIndex.get(i2).intValue();
                    long millisecond = rowData.getTimestamp(this.inputTimeFieldIndex, 3).getMillisecond();
                    List<Integer> list = this.boundedRangeWindowBoundaries.get(i2);
                    if (this.lowerBoundary[intValue] != Long.MIN_VALUE) {
                        int i3 = iArr[i2];
                        while (isInCurrentOverWindow((RowData) this.forwardedInputQueue.get(i3), millisecond + this.lowerBoundary[intValue], false)) {
                            i3++;
                        }
                        iArr[i2] = i3;
                        list.add(Integer.valueOf(i3 - this.lastKeyDataStartPos));
                    }
                    if (this.upperBoundary[intValue] != Long.MAX_VALUE) {
                        int i4 = iArr2[i2];
                        long j = millisecond + this.upperBoundary[intValue];
                        while (i4 < this.forwardedInputQueue.size() && isInCurrentOverWindow((RowData) this.forwardedInputQueue.get(i4), j, true)) {
                            i4++;
                        }
                        iArr2[i2] = i4;
                        list.add(Integer.valueOf(i4 - this.lastKeyDataStartPos));
                    }
                }
            }
            windowBoundarySerializer.serialize(Integer.valueOf(this.boundedRangeWindowBoundaries.size()), this.windowBoundaryWithDataWrapper);
            for (List<Integer> list2 : this.boundedRangeWindowBoundaries) {
                windowBoundarySerializer.serialize(Integer.valueOf(list2.size()), this.windowBoundaryWithDataWrapper);
                Iterator<Integer> it = list2.iterator();
                while (it.hasNext()) {
                    windowBoundarySerializer.serialize(Integer.valueOf(it.next().intValue()), this.windowBoundaryWithDataWrapper);
                }
                list2.clear();
            }
            this.windowBoundaryWithDataBaos.write(this.baos.toByteArray());
            this.baos.reset();
            this.pythonFunctionRunner.process(this.windowBoundaryWithDataBaos.toByteArray());
            this.windowBoundaryWithDataBaos.reset();
            this.elementCount += this.currentBatchCount;
            checkInvokeFinishBundleByCount();
            this.currentBatchCount = 0;
            this.arrowSerializer.resetWriter();
        }
        this.lastKeyDataStartPos = this.forwardedInputQueue.size();
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void processElementInternal(RowData rowData) {
        this.arrowSerializer.write(getFunctionInput(rowData));
        this.currentBatchCount++;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResult(Tuple2<byte[], Integer> tuple2) throws Exception {
        this.bais.setBuffer((byte[]) tuple2.f0, 0, ((Integer) tuple2.f1).intValue());
        int load = this.arrowSerializer.load();
        for (int i = 0; i < load; i++) {
            RowData rowData = (RowData) this.forwardedInputQueue.poll();
            this.lastKeyDataStartPos--;
            this.reuseJoinedRow.setRowKind(rowData.getRowKind());
            this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(rowData, this.arrowSerializer.read(i)));
        }
        this.arrowSerializer.resetReader();
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.UserDefinedFunctions getUserDefinedFunctionsProto() {
        FlinkFnApi.UserDefinedFunctions.Builder newBuilder = FlinkFnApi.UserDefinedFunctions.newBuilder();
        for (int i = 0; i < this.pandasAggFunctions.length; i++) {
            FlinkFnApi.UserDefinedFunction.Builder builder = ProtoUtils.getUserDefinedFunctionProto(this.pandasAggFunctions[i]).toBuilder();
            builder.setWindowIndex(this.aggWindowIndex[i]);
            newBuilder.addUdfs(builder);
        }
        newBuilder.setMetricEnabled(this.pythonConfig.isMetricEnabled());
        newBuilder.setProfileEnabled(this.pythonConfig.isProfileEnabled());
        for (int i2 = 0; i2 < this.lowerBoundary.length; i2++) {
            FlinkFnApi.OverWindow.Builder newBuilder2 = FlinkFnApi.OverWindow.newBuilder();
            if (this.isRangeWindows[i2]) {
                if (this.lowerBoundary[i2] != Long.MIN_VALUE) {
                    if (this.upperBoundary[i2] != Long.MAX_VALUE) {
                        newBuilder2.setWindowType(FlinkFnApi.OverWindow.WindowType.RANGE_SLIDING);
                    } else {
                        newBuilder2.setWindowType(FlinkFnApi.OverWindow.WindowType.RANGE_UNBOUNDED_FOLLOWING);
                    }
                } else if (this.upperBoundary[i2] != Long.MAX_VALUE) {
                    newBuilder2.setWindowType(FlinkFnApi.OverWindow.WindowType.RANGE_UNBOUNDED_PRECEDING);
                } else {
                    newBuilder2.setWindowType(FlinkFnApi.OverWindow.WindowType.RANGE_UNBOUNDED);
                }
            } else if (this.lowerBoundary[i2] != Long.MIN_VALUE) {
                newBuilder2.setLowerBoundary(this.lowerBoundary[i2]);
                if (this.upperBoundary[i2] != Long.MAX_VALUE) {
                    newBuilder2.setUpperBoundary(this.upperBoundary[i2]);
                    newBuilder2.setWindowType(FlinkFnApi.OverWindow.WindowType.ROW_SLIDING);
                } else {
                    newBuilder2.setWindowType(FlinkFnApi.OverWindow.WindowType.ROW_UNBOUNDED_FOLLOWING);
                }
            } else if (this.upperBoundary[i2] != Long.MAX_VALUE) {
                newBuilder2.setUpperBoundary(this.upperBoundary[i2]);
                newBuilder2.setWindowType(FlinkFnApi.OverWindow.WindowType.ROW_UNBOUNDED_PRECEDING);
            } else {
                newBuilder2.setWindowType(FlinkFnApi.OverWindow.WindowType.ROW_UNBOUNDED);
            }
            newBuilder.addWindows(newBuilder2);
        }
        return newBuilder.build();
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public String getFunctionUrn() {
        return PANDAS_BATCH_OVER_WINDOW_AGG_FUNCTION_URN;
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public RowType createUserDefinedFunctionOutputType() {
        return new RowType(this.outputType.getFields().subList(this.inputType.getFieldCount(), this.outputType.getFieldCount()));
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(RowType rowType) {
        return ProtoUtils.createOverWindowArrowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false);
    }

    private boolean isInCurrentOverWindow(RowData rowData, long j, boolean z) {
        long millisecond = j - rowData.getTimestamp(this.inputTimeFieldIndex, 3).getMillisecond();
        return (millisecond > 0 && this.asc) || (millisecond == 0 && z);
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public /* bridge */ /* synthetic */ void finish() throws Exception {
        super.finish();
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractOneInputPythonFunctionOperator
    public /* bridge */ /* synthetic */ void endInput() throws Exception {
        super.endInput();
    }
}
