package org.apache.flink.table.runtime.operators.python.aggregate;

import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.UpdatableRowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.util.RowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty;
import org.apache.flink.table.planner.expressions.PlannerProctimeAttribute;
import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute;
import org.apache.flink.table.planner.expressions.PlannerWindowEnd;
import org.apache.flink.table.planner.expressions.PlannerWindowProperty;
import org.apache.flink.table.planner.expressions.PlannerWindowStart;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.runtime.utils.PassThroughStreamGroupWindowAggregatePythonFunctionRunner;
import org.apache.flink.table.runtime.utils.PythonTestUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator.class */
public class PassThroughPythonStreamGroupWindowAggregateOperator<K> extends PythonStreamGroupWindowAggregateOperator<K, TimeWindow> {
    private final MockPythonWindowOperator<K> mockPythonWindowOperator;
    private final int[] grouping;
    private final PythonAggregateFunctionInfo aggregateFunction;
    private FlinkFnApi.GroupWindow.WindowProperty[] namedProperties;
    private InternalTimerServiceImpl<K, TimeWindow> mockPythonInternalService;
    private Map<String, Map<TimeWindow, List<RowData>>> windowAccumulateData;
    private Map<String, Map<TimeWindow, List<RowData>>> windowRetractData;
    private transient UpdatableRowData reusePythonRowData;
    private transient UpdatableRowData reusePythonTimerRowData;
    private transient UpdatableRowData reusePythonTimerData;
    private transient LinkedBlockingQueue<byte[]> resultBuffer;
    private Projection<RowData, BinaryRowData> groupKeyProjection;
    private Function<RowData, RowData> aggExtracter;
    private Function<TimeWindow, RowData> windowExtractor;
    private JoinedRowData reuseJoinedRow;
    private JoinedRowData windowAggResult;
    private transient ByteArrayOutputStreamWithPos windowBaos;
    private transient DataOutputViewStreamWrapper windowBaosWrapper;

    /* renamed from: org.apache.flink.table.runtime.operators.python.aggregate.PassThroughPythonStreamGroupWindowAggregateOperator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$fnexecution$v1$FlinkFnApi$GroupWindow$WindowProperty = new int[FlinkFnApi.GroupWindow.WindowProperty.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$fnexecution$v1$FlinkFnApi$GroupWindow$WindowProperty[FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$fnexecution$v1$FlinkFnApi$GroupWindow$WindowProperty[FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$fnexecution$v1$FlinkFnApi$GroupWindow$WindowProperty[FlinkFnApi.GroupWindow.WindowProperty.ROW_TIME_ATTRIBUTE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$fnexecution$v1$FlinkFnApi$GroupWindow$WindowProperty[FlinkFnApi.GroupWindow.WindowProperty.PROC_TIME_ATTRIBUTE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/PassThroughPythonStreamGroupWindowAggregateOperator$MockPythonWindowOperator.class */
    private static class MockPythonWindowOperator<K> implements Triggerable<K, TimeWindow> {
        MockPythonWindowOperator() {
        }

        public void onEventTime(InternalTimer<K, TimeWindow> internalTimer) throws Exception {
        }

        public void onProcessingTime(InternalTimer<K, TimeWindow> internalTimer) throws Exception {
        }
    }

    public PassThroughPythonStreamGroupWindowAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, int[] iArr, int i, boolean z, boolean z2, int i2, WindowAssigner<TimeWindow> windowAssigner, LogicalWindow logicalWindow, long j, PlannerNamedWindowProperty[] plannerNamedWindowPropertyArr, ZoneId zoneId) {
        super(configuration, rowType, rowType2, pythonAggregateFunctionInfoArr, new DataViewUtils.DataViewSpec[0][0], iArr, i, z, z2, i2, windowAssigner, logicalWindow, j, plannerNamedWindowPropertyArr, zoneId);
        this.mockPythonWindowOperator = new MockPythonWindowOperator<>();
        this.aggregateFunction = pythonAggregateFunctionInfoArr[0];
        this.grouping = iArr;
        buildWindow(plannerNamedWindowPropertyArr);
    }

    public void open() throws Exception {
        super.open();
        this.windowBaos = new ByteArrayOutputStreamWithPos();
        this.windowBaosWrapper = new DataOutputViewStreamWrapper(this.windowBaos);
        this.reusePythonRowData = new UpdatableRowData(GenericRowData.of(new Object[]{(byte) 0, null, null}), 3);
        this.reusePythonTimerRowData = new UpdatableRowData(GenericRowData.of(new Object[]{(byte) 1, null, null}), 3);
        this.reusePythonTimerData = new UpdatableRowData(GenericRowData.of(new Object[]{0, null, null, null}), 4);
        this.reuseJoinedRow = new JoinedRowData();
        this.windowAggResult = new JoinedRowData();
        this.reusePythonTimerRowData.setField(2, this.reusePythonTimerData);
        this.windowAccumulateData = new HashMap();
        this.windowRetractData = new HashMap();
        this.mockPythonInternalService = getInternalTimerService("python-window-timers", this.windowSerializer, this.mockPythonWindowOperator);
        this.groupKeyProjection = createProjection("GroupKey", this.grouping);
        int intValue = ((Integer) this.aggregateFunction.getInputs()[0]).intValue();
        this.aggExtracter = rowData -> {
            GenericRowData genericRowData = new GenericRowData(1);
            genericRowData.setField(0, Long.valueOf(rowData.getLong(intValue)));
            return genericRowData;
        };
        this.windowExtractor = timeWindow -> {
            GenericRowData genericRowData = new GenericRowData(this.namedProperties.length);
            for (int i = 0; i < this.namedProperties.length; i++) {
                switch (AnonymousClass1.$SwitchMap$org$apache$flink$fnexecution$v1$FlinkFnApi$GroupWindow$WindowProperty[this.namedProperties[i].ordinal()]) {
                    case 1:
                        genericRowData.setField(i, Long.valueOf(getShiftEpochMills(timeWindow.getStart())));
                        break;
                    case 2:
                        genericRowData.setField(i, Long.valueOf(getShiftEpochMills(timeWindow.getEnd())));
                        break;
                    case 3:
                        genericRowData.setField(i, Long.valueOf(getShiftEpochMills(timeWindow.getEnd() - 1)));
                        break;
                    case 4:
                        genericRowData.setField(i, -1L);
                        break;
                }
            }
            return genericRowData;
        };
    }

    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new PassThroughStreamGroupWindowAggregatePythonFunctionRunner(getRuntimeContext().getTaskName(), PythonTestUtils.createTestEnvironmentManager(), this.userDefinedFunctionInputType, this.userDefinedFunctionOutputType, "flink:transform:stream_group_window_aggregate:v1", getUserDefinedFunctionsProto(), "flink:coder:schema:aggregate_function:v1", new HashMap(), PythonTestUtils.createMockFlinkMetricContainer(), getKeyedStateBackend(), getKeySerializer(), this);
    }

    private void buildWindow(PlannerNamedWindowProperty[] plannerNamedWindowPropertyArr) {
        this.namedProperties = new FlinkFnApi.GroupWindow.WindowProperty[plannerNamedWindowPropertyArr.length];
        for (int i = 0; i < plannerNamedWindowPropertyArr.length; i++) {
            PlannerWindowProperty property = plannerNamedWindowPropertyArr[i].getProperty();
            if (property instanceof PlannerWindowStart) {
                this.namedProperties[i] = FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START;
            } else if (property instanceof PlannerWindowEnd) {
                this.namedProperties[i] = FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END;
            } else if (property instanceof PlannerRowtimeAttribute) {
                this.namedProperties[i] = FlinkFnApi.GroupWindow.WindowProperty.ROW_TIME_ATTRIBUTE;
            } else {
                if (!(property instanceof PlannerProctimeAttribute)) {
                    throw new RuntimeException("Unexpected property " + property);
                }
                this.namedProperties[i] = FlinkFnApi.GroupWindow.WindowProperty.PROC_TIME_ATTRIBUTE;
            }
        }
    }

    public void processPythonElement(byte[] bArr) {
        try {
            RowData rowData = (RowData) this.udfInputTypeSerializer.deserialize(new DataInputDeserializer(bArr));
            if (rowData.getByte(0) == 0) {
                RowData row = rowData.getRow(1, this.inputType.getFieldCount());
                BinaryRowData copy = this.groupKeyProjection.apply(row).copy();
                Map<TimeWindow, List<RowData>> computeIfAbsent = this.windowAccumulateData.computeIfAbsent(copy.getString(0).toString(), str -> {
                    return new HashMap();
                });
                Map<TimeWindow, List<RowData>> computeIfAbsent2 = this.windowRetractData.computeIfAbsent(copy.getString(0).toString(), str2 -> {
                    return new HashMap();
                });
                this.mockPythonInternalService.advanceWatermark(rowData.getLong(3));
                Collection<TimeWindow> assignWindows = this.windowAssigner.assignWindows(row, TimeWindowUtil.toUtcTimestampMills(row.getLong(this.inputTimeFieldIndex), this.shiftTimeZone));
                for (TimeWindow timeWindow : assignWindows) {
                    if (RowDataUtil.isAccumulateMsg(row)) {
                        computeIfAbsent.computeIfAbsent(timeWindow, timeWindow2 -> {
                            return new LinkedList();
                        }).add(row);
                    } else {
                        computeIfAbsent2.computeIfAbsent(timeWindow, timeWindow3 -> {
                            return new LinkedList();
                        }).add(row);
                    }
                }
                ArrayList<TimeWindow> arrayList = new ArrayList(assignWindows.size());
                for (TimeWindow timeWindow4 : assignWindows) {
                    if (!isWindowLate(timeWindow4)) {
                        arrayList.add(timeWindow4);
                    }
                }
                for (TimeWindow timeWindow5 : arrayList) {
                    if (onElement(copy, timeWindow5)) {
                        triggerWindowProcess(copy, timeWindow5);
                    }
                    registerCleanupTimer(copy, timeWindow5);
                }
            } else {
                RowData row2 = rowData.getRow(4, 3);
                long j = rowData.getLong(2);
                RowData row3 = row2.getRow(1, getKeyType().getFieldCount());
                byte[] binary = row2.getBinary(2);
                this.bais.setBuffer(binary, 0, binary.length);
                TimeWindow timeWindow6 = (TimeWindow) this.windowSerializer.deserialize(this.baisWrapper);
                if (j == timeWindow6.maxTimestamp()) {
                    triggerWindowProcess(row3, timeWindow6);
                }
                cleanWindowIfNeeded(row3, timeWindow6, j);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void setResultBuffer(LinkedBlockingQueue<byte[]> linkedBlockingQueue) {
        this.resultBuffer = linkedBlockingQueue;
    }

    private boolean isWindowLate(TimeWindow timeWindow) {
        return this.windowAssigner.isEventTime() && cleanupTime(timeWindow) <= this.mockPythonInternalService.currentWatermark();
    }

    private long cleanupTime(TimeWindow timeWindow) {
        long epochMillsForTimer = TimeWindowUtil.toEpochMillsForTimer(timeWindow.maxTimestamp(), this.shiftTimeZone);
        if (!this.windowAssigner.isEventTime()) {
            return Math.max(0L, epochMillsForTimer);
        }
        long max = Math.max(0L, epochMillsForTimer + this.allowedLateness);
        if (max >= epochMillsForTimer) {
            return max;
        }
        return Long.MAX_VALUE;
    }

    private boolean onElement(BinaryRowData binaryRowData, TimeWindow timeWindow) throws IOException {
        if (timeWindow.maxTimestamp() <= this.mockPythonInternalService.currentWatermark()) {
            return true;
        }
        if (this.windowAssigner.isEventTime()) {
            registerEventTimeTimer(binaryRowData, timeWindow);
            return false;
        }
        registerProcessingTimeTimer(binaryRowData, timeWindow);
        return false;
    }

    private void triggerWindowProcess(RowData rowData, TimeWindow timeWindow) throws Exception {
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(1);
        List<RowData> list = this.windowAccumulateData.get(rowData.getString(0).toString()).get(timeWindow);
        List<RowData> list2 = this.windowRetractData.get(rowData.getString(0).toString()).get(timeWindow);
        if (list != null) {
            for (RowData rowData2 : list) {
                if (!hasRetractData(rowData2, list2)) {
                    RowData apply = this.aggExtracter.apply(rowData2);
                    RowData apply2 = this.windowExtractor.apply(timeWindow);
                    this.windowAggResult.replace(rowData, apply);
                    this.reuseJoinedRow.replace(this.windowAggResult, apply2);
                    this.reusePythonRowData.setField(1, this.reuseJoinedRow);
                    this.udfOutputTypeSerializer.serialize(this.reusePythonRowData, dataOutputSerializer);
                    this.resultBuffer.add(dataOutputSerializer.getCopyOfBuffer());
                    return;
                }
            }
        }
    }

    private boolean hasRetractData(RowData rowData, Iterable<RowData> iterable) {
        if (iterable == null) {
            return false;
        }
        for (RowData rowData2 : iterable) {
            if (rowData2.getRowKind() == RowKind.UPDATE_BEFORE) {
                rowData2.setRowKind(RowKind.UPDATE_AFTER);
            } else {
                rowData2.setRowKind(RowKind.INSERT);
            }
            if (rowData.equals(rowData2)) {
                return true;
            }
        }
        return false;
    }

    private Projection<RowData, BinaryRowData> createProjection(String str, int[] iArr) {
        return (Projection) ProjectionCodeGenerator.generateProjection(CodeGeneratorContext.apply(new TableConfig()), str, this.inputType, new RowType((List) Arrays.stream(iArr).mapToObj(i -> {
            return (RowType.RowField) this.inputType.getFields().get(i);
        }).collect(Collectors.toList())), iArr).newInstance(Thread.currentThread().getContextClassLoader());
    }

    private void registerCleanupTimer(RowData rowData, TimeWindow timeWindow) throws IOException {
        if (cleanupTime(timeWindow) == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            registerEventTimeTimer(rowData, timeWindow);
        } else {
            registerProcessingTimeTimer(rowData, timeWindow);
        }
    }

    private void registerEventTimeTimer(RowData rowData, TimeWindow timeWindow) throws IOException {
        emitTimerData(rowData, timeWindow, (byte) 0);
    }

    private void deleteEventTimeTimer(RowData rowData, TimeWindow timeWindow) throws IOException {
        emitTimerData(rowData, timeWindow, (byte) 2);
    }

    private void registerProcessingTimeTimer(RowData rowData, TimeWindow timeWindow) throws IOException {
        emitTimerData(rowData, timeWindow, (byte) 1);
    }

    private void deleteProcessingTimeTimer(RowData rowData, TimeWindow timeWindow) throws IOException {
        emitTimerData(rowData, timeWindow, (byte) 3);
    }

    private void emitTimerData(RowData rowData, TimeWindow timeWindow, byte b) throws IOException {
        this.reusePythonTimerData.setByte(0, b);
        this.reusePythonTimerData.setField(1, rowData);
        this.reusePythonTimerData.setLong(2, timeWindow.maxTimestamp());
        this.windowSerializer.serialize(timeWindow, this.windowBaosWrapper);
        this.reusePythonTimerData.setField(3, this.windowBaos.toByteArray());
        this.windowBaos.reset();
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(1);
        this.udfOutputTypeSerializer.serialize(this.reusePythonTimerRowData, dataOutputSerializer);
        this.resultBuffer.add(dataOutputSerializer.getCopyOfBuffer());
    }

    private void cleanWindowIfNeeded(RowData rowData, TimeWindow timeWindow, long j) throws IOException {
        if (j == cleanupTime(timeWindow)) {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(1);
            RowData apply = this.windowExtractor.apply(timeWindow);
            this.windowAggResult.replace(GenericRowData.of(new Object[]{StringData.fromString("state_cleanup_triggered: " + rowData.getString(0).toString() + " : " + timeWindow)}), GenericRowData.of(new Object[]{0L}));
            this.reuseJoinedRow.replace(this.windowAggResult, apply);
            this.reusePythonRowData.setField(1, this.reuseJoinedRow);
            this.udfOutputTypeSerializer.serialize(this.reusePythonRowData, dataOutputSerializer);
            this.resultBuffer.add(dataOutputSerializer.getCopyOfBuffer());
            if (this.windowAssigner.isEventTime()) {
                deleteEventTimeTimer(rowData, timeWindow);
            } else {
                deleteProcessingTimeTimer(rowData, timeWindow);
            }
        }
    }
}
