package org.apache.flink.streaming.api.streamvertex;

import java.util.ArrayList;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.io.CoRecordReader;
import org.apache.flink.util.MutableObjectIterator;

/* loaded from: input_file:org/apache/flink/streaming/api/streamvertex/CoStreamVertex.class */
public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
    private OutputHandler<OUT> outputHandler;
    MutableObjectIterator<StreamRecord<IN1>> inputIter1;
    MutableObjectIterator<StreamRecord<IN2>> inputIter2;
    CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
    CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
    private static int numTasks;
    protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
    protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
    private CoInvokable<IN1, IN2, OUT> userInvokable = null;

    public CoStreamVertex() {
        numTasks = newVertex();
        this.instanceID = numTasks;
    }

    private void setDeserializers() {
        this.inputDeserializer1 = new StreamRecordSerializer<>(this.configuration.getTypeInfoIn1());
        this.inputDeserializer2 = new StreamRecordSerializer<>(this.configuration.getTypeInfoIn2());
    }

    @Override // org.apache.flink.streaming.api.streamvertex.StreamVertex
    public void setInputsOutputs() {
        this.outputHandler = new OutputHandler<>(this);
        setConfigInputs();
        this.coIter = new CoReaderIterator<>(this.coReader, this.inputDeserializer1, this.inputDeserializer2);
    }

    @Override // org.apache.flink.streaming.api.streamvertex.StreamVertex
    protected void setInvokable() {
        this.userInvokable = (CoInvokable) this.configuration.getUserInvokable();
        this.userInvokable.initialize(this.outputHandler.getCollector(), this.coIter, this.inputDeserializer1, this.inputDeserializer2, this.isMutable);
    }

    protected void setConfigInputs() throws StreamVertexException {
        setDeserializers();
        int numberOfInputs = this.configuration.getNumberOfInputs();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < numberOfInputs; i++) {
            int inputType = this.configuration.getInputType(i);
            switch (inputType) {
                case 1:
                    arrayList.add(new MutableRecordReader(this));
                    break;
                case 2:
                    arrayList2.add(new MutableRecordReader(this));
                    break;
                default:
                    throw new RuntimeException("Invalid input type number: " + inputType);
            }
        }
        this.coReader = new CoRecordReader<>(arrayList, arrayList2);
    }

    @Override // org.apache.flink.streaming.api.streamvertex.StreamVertex
    public void invoke() throws Exception {
        this.outputHandler.invokeUserFunction("CO-TASK", this.userInvokable);
    }
}
