package org.apache.flink.streaming.api.invokable.operator.co;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.class */
public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(CoInvokable.class);
    protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
    protected StreamRecord<IN1> reuse1;
    protected StreamRecord<IN2> reuse2;
    protected StreamRecordSerializer<IN1> srSerializer1;
    protected StreamRecordSerializer<IN2> srSerializer2;
    protected TypeSerializer<IN1> serializer1;
    protected TypeSerializer<IN2> serializer2;

    public CoInvokable(Function function) {
        super(function);
    }

    public void initialize(Collector<OUT> collector, CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coReaderIterator, StreamRecordSerializer<IN1> streamRecordSerializer, StreamRecordSerializer<IN2> streamRecordSerializer2, boolean z) {
        this.collector = collector;
        this.recordIterator = coReaderIterator;
        this.reuse1 = streamRecordSerializer.m11createInstance();
        this.reuse2 = streamRecordSerializer2.m11createInstance();
        this.srSerializer1 = streamRecordSerializer;
        this.srSerializer2 = streamRecordSerializer2;
        this.isMutable = z;
        this.serializer1 = this.srSerializer1.getObjectSerializer();
        this.serializer2 = this.srSerializer2.getObjectSerializer();
    }

    protected void resetReuseAll() {
        this.reuse1 = this.srSerializer1.m11createInstance();
        this.reuse2 = this.srSerializer2.m11createInstance();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetReuse1() {
        this.reuse1 = this.srSerializer1.m11createInstance();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetReuse2() {
        this.reuse2 = this.srSerializer2.m11createInstance();
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void immutableInvoke() throws Exception {
        while (true) {
            int next = this.recordIterator.next(this.reuse1, this.reuse2);
            if (next == 0) {
                return;
            }
            if (next == 1) {
                initialize1();
                handleStream1();
                resetReuse1();
            } else {
                initialize2();
                handleStream2();
                resetReuse2();
            }
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void mutableInvoke() throws Exception {
        while (true) {
            int next = this.recordIterator.next(this.reuse1, this.reuse2);
            if (next == 0) {
                return;
            }
            if (next == 1) {
                initialize1();
                handleStream1();
            } else {
                initialize2();
                handleStream2();
            }
        }
    }

    protected abstract void handleStream1() throws Exception;

    protected abstract void handleStream2() throws Exception;

    protected abstract void callUserFunction1() throws Exception;

    protected abstract void callUserFunction2() throws Exception;

    protected void initialize1() {
    }

    protected void initialize2() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void callUserFunctionAndLogException1() {
        try {
            callUserFunction1();
        } catch (Exception e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Calling user function failed due to: {}", StringUtils.stringifyException(e));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void callUserFunctionAndLogException2() {
        try {
            callUserFunction2();
        } catch (Exception e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Calling user function failed due to: {}", StringUtils.stringifyException(e));
            }
        }
    }

    @Override // org.apache.flink.streaming.api.invokable.StreamInvokable
    protected void callUserFunction() throws Exception {
    }
}
