package org.apache.flink.table.runtime.utils;

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/utils/PassThroughPythonTableFunctionRunner.class */
public abstract class PassThroughPythonTableFunctionRunner<IN> implements PythonFunctionRunner<IN> {
    private boolean bundleStarted = false;
    private final List<IN> bufferedElements = new ArrayList();
    private final FnDataReceiver<byte[]> resultReceiver;
    private transient ByteArrayOutputStreamWithPos baos;
    private transient DataOutputViewStreamWrapper baosWrapper;

    public PassThroughPythonTableFunctionRunner(FnDataReceiver<byte[]> fnDataReceiver) {
        this.resultReceiver = (FnDataReceiver) Preconditions.checkNotNull(fnDataReceiver);
    }

    public void open() {
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
    }

    public void close() {
    }

    public void startBundle() {
        Preconditions.checkState(!this.bundleStarted);
        this.bundleStarted = true;
    }

    public void finishBundle() throws Exception {
        Preconditions.checkState(this.bundleStarted);
        this.bundleStarted = false;
        int i = 0;
        for (IN in : this.bufferedElements) {
            i++;
            this.baos.reset();
            getInputTypeSerializer().serialize(in, this.baosWrapper);
            if (i != 6 && i != 8) {
                this.resultReceiver.accept(this.baos.toByteArray());
            }
            this.resultReceiver.accept(new byte[]{0});
        }
        this.bufferedElements.clear();
    }

    public void processElement(IN in) {
        this.bufferedElements.add(copy(in));
    }

    public abstract IN copy(IN in);

    public abstract TypeSerializer<IN> getInputTypeSerializer();
}
