package org.apache.flink.table.runtime.operators.python.aggregate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.UpdatableRowData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.runtime.functions.CleanupState;
import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
import org.apache.flink.table.runtime.runners.python.beam.BeamTableStatefulPythonFunctionRunner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupAggregateOperator.class */
public class PythonStreamGroupAggregateOperator extends AbstractOneInputPythonFunctionOperator<RowData, RowData> implements Triggerable<RowData, VoidNamespace>, CleanupState {
    private static final long serialVersionUID = 1;

    @VisibleForTesting
    protected static final String FLINK_AGGREGATE_FUNCTION_SCHEMA_CODER_URN = "flink:coder:schema:aggregate_function:v1";

    @VisibleForTesting
    protected static final String STREAM_GROUP_AGGREGATE_URN = "flink:transform:stream_group_aggregate:v1";

    @VisibleForTesting
    protected static final byte NORMAL_RECORD = 0;
    private static final byte TRIGGER_TIMER = 1;
    protected final RowType inputType;
    protected final RowType outputType;
    private final Map<String, String> jobOptions;
    private final PythonAggregateFunctionInfo[] aggregateFunctions;
    private final DataViewUtils.DataViewSpec[][] dataViewSpecs;
    private final int[] grouping;
    private final int indexOfCountStar;
    private final boolean countStarInserted;
    private final boolean generateUpdateBefore;
    private final long minRetentionTime;
    private final long maxRetentionTime;
    private final int stateCacheSize;
    private final int mapStateReadCacheSize;
    private final int mapStateWriteCacheSize;
    private final boolean stateCleaningEnabled;
    private transient Object keyForTimerService;
    protected transient RowType userDefinedFunctionInputType;
    protected transient TypeSerializer<RowData> udfOutputTypeSerializer;
    protected transient TypeSerializer<RowData> udfInputTypeSerializer;
    private transient ByteArrayInputStreamWithPos bais;
    private transient DataInputViewStreamWrapper baisWrapper;
    private transient ByteArrayOutputStreamWithPos baos;
    private transient DataOutputViewStreamWrapper baosWrapper;
    private transient TimerService timerService;
    private transient ValueState<Long> cleanupTimeState;
    private transient UpdatableRowData reuseRowData;
    private transient UpdatableRowData reuseTimerRowData;
    private transient StreamRecordRowDataWrappingCollector rowDataWrapper;

    public PythonStreamGroupAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, DataViewUtils.DataViewSpec[][] dataViewSpecArr, int[] iArr, int i, boolean z, boolean z2, long j, long j2) {
        super(configuration);
        this.inputType = (RowType) Preconditions.checkNotNull(rowType);
        this.outputType = (RowType) Preconditions.checkNotNull(rowType2);
        this.aggregateFunctions = pythonAggregateFunctionInfoArr;
        this.dataViewSpecs = dataViewSpecArr;
        this.jobOptions = buildJobOptions(configuration);
        this.grouping = iArr;
        this.indexOfCountStar = i;
        this.countStarInserted = z;
        this.generateUpdateBefore = z2;
        this.minRetentionTime = j;
        this.maxRetentionTime = j2;
        this.stateCleaningEnabled = j > 1;
        this.stateCacheSize = ((Integer) configuration.get(PythonOptions.STATE_CACHE_SIZE)).intValue();
        this.mapStateReadCacheSize = ((Integer) configuration.get(PythonOptions.MAP_STATE_READ_CACHE_SIZE)).intValue();
        this.mapStateWriteCacheSize = ((Integer) configuration.get(PythonOptions.MAP_STATE_WRITE_CACHE_SIZE)).intValue();
    }

    public void setCurrentKey(Object obj) {
        this.keyForTimerService = obj;
    }

    public Object getCurrentKey() {
        return this.keyForTimerService;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new RowType.RowField("record_type", new TinyIntType()));
        arrayList.add(new RowType.RowField("row", this.inputType));
        arrayList.add(new RowType.RowField(ByteBuddyDoFnInvokerFactory.TIMESTAMP_PARAMETER_METHOD, new BigIntType()));
        arrayList.add(new RowType.RowField("key", getKeyType()));
        this.userDefinedFunctionInputType = new RowType(arrayList);
        this.udfInputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(this.userDefinedFunctionInputType);
        this.udfOutputTypeSerializer = PythonTypeUtils.toBlinkTypeSerializer(this.outputType);
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        this.timerService = new SimpleTimerService(getInternalTimerService("state-clean-timer", VoidNamespaceSerializer.INSTANCE, this));
        this.reuseRowData = new UpdatableRowData(GenericRowData.of(new Object[]{(byte) 0, null, null, null}), 4);
        this.reuseTimerRowData = new UpdatableRowData(GenericRowData.of(new Object[]{(byte) 1, null, null, null}), 4);
        this.rowDataWrapper = new StreamRecordRowDataWrappingCollector(this.output);
        initCleanupTimeState();
        super.open();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new BeamTableStatefulPythonFunctionRunner(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), this.userDefinedFunctionInputType, this.outputType, STREAM_GROUP_AGGREGATE_URN, getUserDefinedFunctionsProto(), FLINK_AGGREGATE_FUNCTION_SCHEMA_CODER_URN, this.jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), getKeySerializer(), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()));
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        processElementInternal((RowData) streamRecord.getValue());
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void emitResult(Tuple2<byte[], Integer> tuple2) throws Exception {
        this.bais.setBuffer((byte[]) tuple2.f0, 0, ((Integer) tuple2.f1).intValue());
        this.rowDataWrapper.collect((RowData) this.udfOutputTypeSerializer.deserialize(this.baisWrapper));
    }

    public void onEventTime(InternalTimer<RowData, VoidNamespace> internalTimer) {
    }

    public void onProcessingTime(InternalTimer<RowData, VoidNamespace> internalTimer) throws Exception {
        if (this.stateCleaningEnabled) {
            RowData rowData = (RowData) internalTimer.getKey();
            this.reuseTimerRowData.setLong(2, internalTimer.getTimestamp());
            this.reuseTimerRowData.setField(3, rowData);
            this.udfInputTypeSerializer.serialize(this.reuseTimerRowData, this.baosWrapper);
            this.pythonFunctionRunner.process(this.baos.toByteArray());
            this.baos.reset();
            this.elementCount++;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.aggregateFunctions[0].getPythonFunction().getPythonEnv();
    }

    private void registerProcessingCleanupTimer(long j) throws Exception {
        if (this.stateCleaningEnabled) {
            synchronized (getKeyedStateBackend()) {
                getKeyedStateBackend().setCurrentKey(getCurrentKey());
                registerProcessingCleanupTimer(this.cleanupTimeState, j, this.minRetentionTime, this.maxRetentionTime, this.timerService);
            }
        }
    }

    private void initCleanupTimeState() {
        if (this.stateCleaningEnabled) {
            this.cleanupTimeState = getRuntimeContext().getState(new ValueStateDescriptor("PythonGroupAggregateCleanupTime", Types.LONG));
        }
    }

    private Map<String, String> buildJobOptions(Configuration configuration) {
        HashMap hashMap = new HashMap();
        if (configuration.containsKey("table.exec.timezone")) {
            hashMap.put("table.exec.timezone", configuration.getString("table.exec.timezone", (String) null));
        }
        hashMap.put(PythonOptions.STATE_CACHE_SIZE.key(), String.valueOf(configuration.get(PythonOptions.STATE_CACHE_SIZE)));
        hashMap.put(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key(), String.valueOf(configuration.get(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE)));
        return hashMap;
    }

    @VisibleForTesting
    protected TypeSerializer getKeySerializer() {
        return PythonTypeUtils.toBlinkTypeSerializer(getKeyType());
    }

    private void processElementInternal(RowData rowData) throws Exception {
        registerProcessingCleanupTimer(this.timerService.currentProcessingTime());
        this.reuseRowData.setField(1, rowData);
        this.udfInputTypeSerializer.serialize(this.reuseRowData, this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
    }

    protected RowType getKeyType() {
        return KeySelectorUtil.getRowDataSelector(this.grouping, InternalTypeInfo.of(this.inputType)).getProducedType().toRowType();
    }

    public FlinkFnApi.UserDefinedAggregateFunctions getUserDefinedFunctionsProto() {
        FlinkFnApi.UserDefinedAggregateFunctions.Builder newBuilder = FlinkFnApi.UserDefinedAggregateFunctions.newBuilder();
        for (int i = 0; i < this.aggregateFunctions.length; i++) {
            DataViewUtils.DataViewSpec[] dataViewSpecArr = null;
            if (i < this.dataViewSpecs.length) {
                dataViewSpecArr = this.dataViewSpecs[i];
            }
            newBuilder.addUdfs(PythonOperatorUtils.getUserDefinedAggregateFunctionProto(this.aggregateFunctions[i], dataViewSpecArr));
        }
        newBuilder.setMetricEnabled(getPythonConfig().isMetricEnabled());
        newBuilder.addAllGrouping((Iterable) Arrays.stream(this.grouping).boxed().collect(Collectors.toList()));
        newBuilder.setGenerateUpdateBefore(this.generateUpdateBefore);
        newBuilder.setIndexOfCountStar(this.indexOfCountStar);
        newBuilder.setCountStarInserted(this.countStarInserted);
        newBuilder.setKeyType(PythonTypeUtils.toProtoType(getKeyType()));
        newBuilder.setStateCleaningEnabled(this.stateCleaningEnabled);
        newBuilder.setStateCacheSize(this.stateCacheSize);
        newBuilder.setMapStateReadCacheSize(this.mapStateReadCacheSize);
        newBuilder.setMapStateWriteCacheSize(this.mapStateWriteCacheSize);
        return newBuilder.build();
    }
}
