package org.apache.flink.table.runtime.operators.python.aggregate;

import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.typeutils.DataViewUtils;
import org.apache.flink.table.runtime.operators.python.AbstractOneInputPythonFunctionOperator;
import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
import org.apache.flink.table.runtime.runners.python.beam.BeamTablePythonFunctionRunner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.PythonTypeUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/AbstractPythonStreamAggregateOperator.class */
public abstract class AbstractPythonStreamAggregateOperator extends AbstractOneInputPythonFunctionOperator<RowData, RowData> {
    private static final long serialVersionUID = 1;

    @VisibleForTesting
    static final byte NORMAL_RECORD = 0;

    @VisibleForTesting
    static final byte TRIGGER_TIMER = 1;
    private final PythonAggregateFunctionInfo[] aggregateFunctions;
    private final DataViewUtils.DataViewSpec[][] dataViewSpecs;
    protected final RowType inputType;
    protected final RowType outputType;
    private final int[] grouping;
    private final int indexOfCountStar;
    private final boolean generateUpdateBefore;
    private final int stateCacheSize;
    private final int mapStateReadCacheSize;
    private final int mapStateWriteCacheSize;
    private transient Object keyForTimerService;
    protected transient RowType userDefinedFunctionInputType;
    protected transient RowType userDefinedFunctionOutputType;
    transient TypeSerializer<RowData> udfOutputTypeSerializer;
    transient TypeSerializer<RowData> udfInputTypeSerializer;
    protected transient ByteArrayInputStreamWithPos bais;
    protected transient DataInputViewStreamWrapper baisWrapper;
    protected transient ByteArrayOutputStreamWithPos baos;
    protected transient DataOutputViewStreamWrapper baosWrapper;
    protected transient StreamRecordRowDataWrappingCollector rowDataWrapper;

    public AbstractPythonStreamAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, DataViewUtils.DataViewSpec[][] dataViewSpecArr, int[] iArr, int i, boolean z) {
        super(configuration);
        this.inputType = (RowType) Preconditions.checkNotNull(rowType);
        this.outputType = (RowType) Preconditions.checkNotNull(rowType2);
        this.aggregateFunctions = pythonAggregateFunctionInfoArr;
        this.dataViewSpecs = dataViewSpecArr;
        this.grouping = iArr;
        this.indexOfCountStar = i;
        this.generateUpdateBefore = z;
        this.stateCacheSize = ((Integer) configuration.get(PythonOptions.STATE_CACHE_SIZE)).intValue();
        this.mapStateReadCacheSize = ((Integer) configuration.get(PythonOptions.MAP_STATE_READ_CACHE_SIZE)).intValue();
        this.mapStateWriteCacheSize = ((Integer) configuration.get(PythonOptions.MAP_STATE_WRITE_CACHE_SIZE)).intValue();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        this.userDefinedFunctionInputType = createUserDefinedFunctionInputType();
        this.udfInputTypeSerializer = PythonTypeUtils.toInternalSerializer(this.userDefinedFunctionInputType);
        this.userDefinedFunctionOutputType = createUserDefinedFunctionOutputType();
        this.udfOutputTypeSerializer = PythonTypeUtils.toInternalSerializer(this.userDefinedFunctionOutputType);
        this.rowDataWrapper = new StreamRecordRowDataWrappingCollector(this.output);
        super.open();
        configJobOptions();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        processElementInternal((RowData) streamRecord.getValue());
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return BeamTablePythonFunctionRunner.stateful(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), getFunctionUrn(), getUserDefinedFunctionsProto(), this.jobOptions, getFlinkMetricContainer(), getKeyedStateBackend(), getKeySerializer(), getWindowSerializer(), getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()), createInputCoderInfoDescriptor(this.userDefinedFunctionInputType), createOutputCoderInfoDescriptor(this.userDefinedFunctionOutputType));
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void setCurrentKey(Object obj) {
        this.keyForTimerService = obj;
    }

    public Object getCurrentKey() {
        return this.keyForTimerService;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.aggregateFunctions[0].getPythonFunction().getPythonEnv();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public TypeSerializer getKeySerializer() {
        return PythonTypeUtils.toInternalSerializer(getKeyType());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RowType getKeyType() {
        return KeySelectorUtil.getRowDataSelector(this.grouping, InternalTypeInfo.of(this.inputType)).getProducedType().toRowType();
    }

    TypeSerializer getWindowSerializer() {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkFnApi.UserDefinedAggregateFunctions getUserDefinedFunctionsProto() {
        FlinkFnApi.UserDefinedAggregateFunctions.Builder newBuilder = FlinkFnApi.UserDefinedAggregateFunctions.newBuilder();
        newBuilder.setMetricEnabled(this.pythonConfig.isMetricEnabled());
        newBuilder.setProfileEnabled(this.pythonConfig.isProfileEnabled());
        newBuilder.addAllGrouping((Iterable) Arrays.stream(this.grouping).boxed().collect(Collectors.toList()));
        newBuilder.setGenerateUpdateBefore(this.generateUpdateBefore);
        newBuilder.setIndexOfCountStar(this.indexOfCountStar);
        newBuilder.setKeyType(PythonTypeUtils.toProtoType(getKeyType()));
        newBuilder.setStateCacheSize(this.stateCacheSize);
        newBuilder.setMapStateReadCacheSize(this.mapStateReadCacheSize);
        newBuilder.setMapStateWriteCacheSize(this.mapStateWriteCacheSize);
        for (int i = 0; i < this.aggregateFunctions.length; i++) {
            DataViewUtils.DataViewSpec[] dataViewSpecArr = null;
            if (i < this.dataViewSpecs.length) {
                dataViewSpecArr = this.dataViewSpecs[i];
            }
            newBuilder.addUdfs(ProtoUtils.getUserDefinedAggregateFunctionProto(this.aggregateFunctions[i], dataViewSpecArr));
        }
        return newBuilder.build();
    }

    public abstract String getFunctionUrn();

    public abstract void processElementInternal(RowData rowData) throws Exception;

    public abstract RowType createUserDefinedFunctionInputType();

    public abstract RowType createUserDefinedFunctionOutputType();

    private void configJobOptions() {
        this.jobOptions.put(PythonOptions.STATE_CACHE_SIZE.key(), String.valueOf(this.config.get(PythonOptions.STATE_CACHE_SIZE)));
        this.jobOptions.put(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key(), String.valueOf(this.config.get(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE)));
    }

    public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(RowType rowType) {
        return ProtoUtils.createRowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false);
    }

    public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(RowType rowType) {
        return ProtoUtils.createRowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false);
    }
}
