package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.operators.window.internal.InternalWindowProcessFunction;
import org.apache.flink.table.runtime.operators.window.triggers.Trigger;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator.class */
public class StreamArrowPythonGroupWindowAggregateFunctionOperator<K, W extends Window> extends AbstractArrowPythonAggregateFunctionOperator implements Triggerable<K, W> {
    private static final long serialVersionUID = 1;
    private final int[] namedProperties;
    private final int inputTimeFieldIndex;
    private final WindowAssigner<W> windowAssigner;
    private final Trigger<W> trigger;
    private final long allowedLateness;
    private transient InternalTimerService<W> internalTimerService;
    private transient InternalListState<K, W, RowData> windowAccumulateData;
    private transient InternalListState<K, W, RowData> windowRetractData;
    private transient StreamArrowPythonGroupWindowAggregateFunctionOperator<K, W>.TriggerContext triggerContext;
    private transient TypeSerializer<W> windowSerializer;
    private transient LinkedList<Tuple2<RowData, W>> inputKeyAndWindow;
    private transient GenericRowData windowProperty;
    private transient JoinedRowData windowAggResult;
    private transient long timestamp;
    private transient Collection<W> elementWindows;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator$TriggerContext.class */
    public class TriggerContext implements Trigger.TriggerContext {
        private W window;

        private TriggerContext() {
        }

        public void open() throws Exception {
            StreamArrowPythonGroupWindowAggregateFunctionOperator.this.trigger.open(this);
        }

        boolean onElement(RowData rowData, long j) throws Exception {
            return StreamArrowPythonGroupWindowAggregateFunctionOperator.this.trigger.onElement(rowData, j, this.window);
        }

        boolean onProcessingTime(long j) throws Exception {
            return StreamArrowPythonGroupWindowAggregateFunctionOperator.this.trigger.onProcessingTime(j, this.window);
        }

        boolean onEventTime(long j) throws Exception {
            return StreamArrowPythonGroupWindowAggregateFunctionOperator.this.trigger.onEventTime(j, this.window);
        }

        void clear() throws Exception {
            StreamArrowPythonGroupWindowAggregateFunctionOperator.this.trigger.clear(this.window);
        }

        public long getCurrentProcessingTime() {
            return StreamArrowPythonGroupWindowAggregateFunctionOperator.this.internalTimerService.currentProcessingTime();
        }

        public long getCurrentWatermark() {
            return StreamArrowPythonGroupWindowAggregateFunctionOperator.this.internalTimerService.currentWatermark();
        }

        public MetricGroup getMetricGroup() {
            return StreamArrowPythonGroupWindowAggregateFunctionOperator.this.getMetricGroup();
        }

        public void registerProcessingTimeTimer(long j) {
            StreamArrowPythonGroupWindowAggregateFunctionOperator.this.internalTimerService.registerProcessingTimeTimer(this.window, j);
        }

        public void registerEventTimeTimer(long j) {
            StreamArrowPythonGroupWindowAggregateFunctionOperator.this.internalTimerService.registerEventTimeTimer(this.window, j);
        }

        public void deleteProcessingTimeTimer(long j) {
            StreamArrowPythonGroupWindowAggregateFunctionOperator.this.internalTimerService.deleteProcessingTimeTimer(this.window, j);
        }

        public void deleteEventTimeTimer(long j) {
            StreamArrowPythonGroupWindowAggregateFunctionOperator.this.internalTimerService.deleteEventTimeTimer(this.window, j);
        }

        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) StreamArrowPythonGroupWindowAggregateFunctionOperator.this.getPartitionedState(this.window, StreamArrowPythonGroupWindowAggregateFunctionOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperator$WindowContext.class */
    private class WindowContext implements InternalWindowProcessFunction.Context<K, W> {
        private WindowContext() {
        }

        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
            return (S) StreamArrowPythonGroupWindowAggregateFunctionOperator.this.getPartitionedState(stateDescriptor);
        }

        public K currentKey() {
            throw new RuntimeException("The method currentKey should not be called.");
        }

        public long currentProcessingTime() {
            throw new RuntimeException("The method currentProcessingTime should not be called.");
        }

        public long currentWatermark() {
            throw new RuntimeException("The method currentWatermark should not be called.");
        }

        public RowData getWindowAccumulators(W w) {
            throw new RuntimeException("The method getWindowAccumulators should not be called.");
        }

        public void setWindowAccumulators(W w, RowData rowData) {
            throw new RuntimeException("The method setWindowAccumulators should not be called.");
        }

        public void clearWindowState(W w) {
            throw new RuntimeException("The method clearWindowState should not be called.");
        }

        public void clearPreviousState(W w) {
            throw new RuntimeException("The method clearPreviousState should not be called.");
        }

        public void clearTrigger(W w) {
            throw new RuntimeException("The method clearTrigger should not be called.");
        }

        public void onMerge(W w, Collection<W> collection) {
            throw new RuntimeException("The method onMerge should not be called.");
        }

        public void deleteCleanupTimer(W w) {
            throw new RuntimeException("The method deleteCleanupTimer should not be called.");
        }
    }

    public StreamArrowPythonGroupWindowAggregateFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, int i, WindowAssigner<W> windowAssigner, Trigger<W> trigger, long j, int[] iArr, int[] iArr2, int[] iArr3) {
        super(configuration, pythonFunctionInfoArr, rowType, rowType2, iArr2, iArr3);
        this.namedProperties = iArr;
        this.inputTimeFieldIndex = i;
        this.windowAssigner = windowAssigner;
        this.trigger = trigger;
        this.allowedLateness = j;
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.arrow.AbstractArrowPythonAggregateFunctionOperator, org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.userDefinedFunctionOutputType = new RowType(this.outputType.getFields().subList(this.groupingSet.length, this.outputType.getFieldCount() - this.namedProperties.length));
        this.windowSerializer = this.windowAssigner.getWindowSerializer(new ExecutionConfig());
        this.internalTimerService = getInternalTimerService("window-timers", this.windowSerializer, this);
        this.triggerContext = new TriggerContext();
        this.triggerContext.open();
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("window-input", new RowDataSerializer(this.inputType));
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("data-retract", new RowDataSerializer(this.inputType));
        this.windowAccumulateData = getOrCreateKeyedState(this.windowSerializer, listStateDescriptor);
        this.windowRetractData = getOrCreateKeyedState(this.windowSerializer, listStateDescriptor2);
        this.inputKeyAndWindow = new LinkedList<>();
        this.windowProperty = new GenericRowData(this.namedProperties.length);
        this.windowAggResult = new JoinedRowData();
        this.windowAssigner.open(new WindowContext());
        super.open();
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void bufferInput(RowData rowData) throws Exception {
        if (this.windowAssigner.isEventTime()) {
            this.timestamp = rowData.getLong(this.inputTimeFieldIndex);
        } else {
            this.timestamp = this.internalTimerService.currentProcessingTime();
        }
        this.elementWindows = this.windowAssigner.assignWindows(rowData, this.timestamp);
        for (W w : this.elementWindows) {
            if (RowDataUtil.isAccumulateMsg(rowData)) {
                this.windowAccumulateData.setCurrentNamespace(w);
                this.windowAccumulateData.add(rowData);
            } else {
                this.windowRetractData.setCurrentNamespace(w);
                this.windowRetractData.add(rowData);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void processElementInternal(RowData rowData) throws Exception {
        ArrayList<Window> arrayList = new ArrayList(this.elementWindows.size());
        for (W w : this.elementWindows) {
            if (!isWindowLate(w)) {
                arrayList.add(w);
            }
        }
        for (Window window : arrayList) {
            ((TriggerContext) this.triggerContext).window = window;
            if (this.triggerContext.onElement(rowData, this.timestamp)) {
                triggerWindowProcess(window);
            }
            registerCleanupTimer(window);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResult(Tuple2<byte[], Integer> tuple2) throws Exception {
        this.bais.setBuffer((byte[]) tuple2.f0, 0, ((Integer) tuple2.f1).intValue());
        int load = this.arrowSerializer.load();
        for (int i = 0; i < load; i++) {
            Tuple2<RowData, W> poll = this.inputKeyAndWindow.poll();
            RowData rowData = (RowData) poll.f0;
            setWindowProperty((Window) poll.f1);
            this.windowAggResult.replace(rowData, this.arrowSerializer.read(i));
            this.rowDataWrapper.collect((RowData) this.reuseJoinedRow.replace(this.windowAggResult, this.windowProperty));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        ((TriggerContext) this.triggerContext).window = (Window) internalTimer.getNamespace();
        if (this.triggerContext.onEventTime(internalTimer.getTimestamp())) {
            triggerWindowProcess(((TriggerContext) this.triggerContext).window);
        }
        if (this.windowAssigner.isEventTime()) {
            cleanWindowIfNeeded(((TriggerContext) this.triggerContext).window, internalTimer.getTimestamp());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        setCurrentKey(internalTimer.getKey());
        ((TriggerContext) this.triggerContext).window = (Window) internalTimer.getNamespace();
        if (this.triggerContext.onProcessingTime(internalTimer.getTimestamp())) {
            triggerWindowProcess(((TriggerContext) this.triggerContext).window);
        }
        if (this.windowAssigner.isEventTime()) {
            return;
        }
        cleanWindowIfNeeded(((TriggerContext) this.triggerContext).window, internalTimer.getTimestamp());
    }

    private boolean isWindowLate(W w) {
        return this.windowAssigner.isEventTime() && cleanupTime(w) <= this.internalTimerService.currentWatermark();
    }

    private long cleanupTime(W w) {
        if (!this.windowAssigner.isEventTime()) {
            return w.maxTimestamp();
        }
        long maxTimestamp = w.maxTimestamp() + this.allowedLateness;
        if (maxTimestamp >= w.maxTimestamp()) {
            return maxTimestamp;
        }
        return Long.MAX_VALUE;
    }

    private void triggerWindowProcess(W w) throws Exception {
        this.windowAccumulateData.setCurrentNamespace(w);
        this.windowRetractData.setCurrentNamespace(w);
        Iterable<RowData> iterable = (Iterable) this.windowAccumulateData.get();
        Iterable<RowData> iterable2 = (Iterable) this.windowRetractData.get();
        if (iterable != null) {
            this.currentBatchCount = 0;
            for (RowData rowData : iterable) {
                if (!hasRetractData(rowData, iterable2)) {
                    this.arrowSerializer.write(getFunctionInput(rowData));
                    this.currentBatchCount++;
                }
            }
            if (this.currentBatchCount > 0) {
                this.inputKeyAndWindow.add(Tuple2.of((RowData) getCurrentKey(), w));
                this.arrowSerializer.finishCurrentBatch();
                this.pythonFunctionRunner.process(this.baos.toByteArray());
                this.elementCount += this.currentBatchCount;
                checkInvokeFinishBundleByCount();
                this.currentBatchCount = 0;
                this.baos.reset();
            }
        }
    }

    private boolean hasRetractData(RowData rowData, Iterable<RowData> iterable) {
        BinaryRowData binaryRowData = (BinaryRowData) rowData;
        if (iterable == null) {
            return false;
        }
        Iterator<RowData> it = iterable.iterator();
        while (it.hasNext()) {
            BinaryRowData binaryRowData2 = (RowData) it.next();
            if (binaryRowData2.getRowKind() == RowKind.UPDATE_BEFORE) {
                binaryRowData2.setRowKind(RowKind.UPDATE_AFTER);
            } else {
                binaryRowData2.setRowKind(RowKind.INSERT);
            }
            BinaryRowData binaryRowData3 = binaryRowData2;
            if (binaryRowData.getSizeInBytes() == binaryRowData3.getSizeInBytes() && BinaryRowDataUtil.byteArrayEquals(binaryRowData.getSegments()[0].getHeapMemory(), binaryRowData3.getSegments()[0].getHeapMemory(), binaryRowData.getSizeInBytes())) {
                return true;
            }
        }
        return false;
    }

    private void registerCleanupTimer(W w) {
        long cleanupTime = cleanupTime(w);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            this.triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

    private void setWindowProperty(W w) {
        for (int i = 0; i < this.namedProperties.length; i++) {
            switch (this.namedProperties[i]) {
                case 0:
                    this.windowProperty.setField(i, TimestampData.fromEpochMillis(((TimeWindow) w).getStart()));
                    break;
                case 1:
                    this.windowProperty.setField(i, TimestampData.fromEpochMillis(((TimeWindow) w).getEnd()));
                    break;
                case 2:
                    this.windowProperty.setField(i, TimestampData.fromEpochMillis(((TimeWindow) w).getEnd() - 1));
                    break;
                case 3:
                    this.windowProperty.setField(i, TimestampData.fromEpochMillis(-1L));
                    break;
            }
        }
    }

    private void cleanWindowIfNeeded(W w, long j) throws Exception {
        if (j == cleanupTime(w)) {
            this.windowAccumulateData.setCurrentNamespace(w);
            this.windowAccumulateData.clear();
            this.windowRetractData.setCurrentNamespace(w);
            this.windowRetractData.clear();
            ((TriggerContext) this.triggerContext).window = w;
            this.triggerContext.clear();
        }
    }
}
