package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.shaded.com.google.common.base.Ascii;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.io.CoReaderIterator;
import org.apache.flink.streaming.runtime.io.CoRecordReader;
import org.apache.flink.streaming.runtime.io.InputGateFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.class */
public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
    private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
    protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
    protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
    CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
    CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;

    public void invoke() throws Exception {
        int i;
        this.isRunning = true;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Task {} invoked", getName());
        }
        try {
            try {
                openOperator();
                StreamRecord<IN1> m248createInstance = this.inputDeserializer1.m248createInstance();
                StreamRecord<IN2> m248createInstance2 = this.inputDeserializer2.m248createInstance();
                while (this.isRunning) {
                    try {
                        i = this.coIter.next(m248createInstance, m248createInstance2);
                    } catch (IOException e) {
                        if (this.isRunning) {
                            throw new RuntimeException("Could not read next record.", e);
                        }
                        i = 0;
                    } catch (IllegalStateException e2) {
                        if (this.isRunning) {
                            throw new RuntimeException("Could not read next record.", e2);
                        }
                        i = 0;
                    }
                    if (i == 0) {
                        break;
                    }
                    if (i == 1) {
                        ((TwoInputStreamOperator) this.streamOperator).processElement1(m248createInstance.getObject());
                        m248createInstance = this.inputDeserializer1.m248createInstance();
                    } else {
                        ((TwoInputStreamOperator) this.streamOperator).processElement2(m248createInstance2.getObject());
                        m248createInstance2 = this.inputDeserializer2.m248createInstance();
                    }
                }
                closeOperator();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Task {} invocation finished", getName());
                }
            } catch (Exception e3) {
                LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e3);
                if (0 != 0) {
                    try {
                        closeOperator();
                    } catch (Throwable th) {
                        LOG.warn("Exception while closing operator.", th);
                    }
                }
                throw e3;
            }
        } finally {
            this.isRunning = false;
            this.outputHandler.flushOutputs();
            clearBuffers();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void registerInputOutput() {
        super.registerInputOutput();
        this.inputDeserializer1 = this.configuration.getTypeSerializerIn1(this.userClassLoader);
        this.inputDeserializer2 = this.configuration.getTypeSerializerIn2(this.userClassLoader);
        int numberOfInputs = this.configuration.getNumberOfInputs();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        List<StreamEdge> inPhysicalEdges = this.configuration.getInPhysicalEdges(this.userClassLoader);
        for (int i = 0; i < numberOfInputs; i++) {
            int typeNumber = inPhysicalEdges.get(i).getTypeNumber();
            InputGate inputGate = getEnvironment().getInputGate(i);
            switch (typeNumber) {
                case Ascii.SOH /* 1 */:
                    arrayList.add(inputGate);
                    break;
                case 2:
                    arrayList2.add(inputGate);
                    break;
                default:
                    throw new RuntimeException("Invalid input type number: " + typeNumber);
            }
        }
        this.coReader = new CoRecordReader<>(InputGateFactory.createInputGate(arrayList), InputGateFactory.createInputGate(arrayList2));
        this.coIter = new CoReaderIterator<>(this.coReader, this.inputDeserializer1, this.inputDeserializer2);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void clearBuffers() throws IOException {
        super.clearBuffers();
        this.coReader.clearBuffers();
        this.coReader.cleanup();
    }
}
