package org.apache.flink.streaming.runtime.tasks;

import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TestSequentialReadingStreamOperator;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.class */
public class StreamTaskSelectiveReadingTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest$SpecialRuleReadingStreamOperator.class */
    private static class SpecialRuleReadingStreamOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, Integer, String>, InputSelectable, BoundedMultiInput {
        private final String name;
        private final int input1Records;
        private final int input2Records;
        private final int maxContinuousReadingRecords;
        private int input1ReadingRecords = 0;
        private int input2ReadingRecords = 0;
        private int continuousReadingRecords = 0;
        private InputSelection inputSelection = InputSelection.FIRST;

        SpecialRuleReadingStreamOperator(String str, int i, int i2, int i3) {
            this.name = str;
            this.input1Records = i;
            this.input2Records = i2;
            this.maxContinuousReadingRecords = i3;
        }

        public InputSelection nextSelection() {
            return this.inputSelection;
        }

        public void processElement1(StreamRecord<String> streamRecord) {
            this.output.collect(streamRecord.replace("[" + this.name + "-1]: " + ((String) streamRecord.getValue())));
            this.input1ReadingRecords++;
            this.continuousReadingRecords++;
            if (this.continuousReadingRecords == this.maxContinuousReadingRecords) {
                this.continuousReadingRecords = 0;
                if (this.input2ReadingRecords < this.input2Records) {
                    this.inputSelection = InputSelection.SECOND;
                    return;
                }
            }
            this.inputSelection = InputSelection.FIRST;
        }

        public void processElement2(StreamRecord<Integer> streamRecord) {
            this.output.collect(streamRecord.replace("[" + this.name + "-2]: " + streamRecord.getValue()));
            this.input2ReadingRecords++;
            this.continuousReadingRecords++;
            if (this.continuousReadingRecords == this.maxContinuousReadingRecords) {
                this.continuousReadingRecords = 0;
                if (this.input1ReadingRecords < this.input1Records) {
                    this.inputSelection = InputSelection.FIRST;
                    return;
                }
            }
            this.inputSelection = InputSelection.SECOND;
        }

        public void endInput(int i) {
            this.inputSelection = i == 1 ? InputSelection.SECOND : InputSelection.FIRST;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest$TestReadFinishedInputStreamOperator.class */
    private static class TestReadFinishedInputStreamOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, Integer, String>, InputSelectable {
        private InputSelection inputSelection = InputSelection.FIRST;

        TestReadFinishedInputStreamOperator() {
        }

        public InputSelection nextSelection() {
            return this.inputSelection;
        }

        public void processElement1(StreamRecord<String> streamRecord) {
        }

        public void processElement2(StreamRecord<Integer> streamRecord) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest$TestSelectiveReadingTask.class */
    public static class TestSelectiveReadingTask<IN1, IN2, OUT> extends TwoInputStreamTask<IN1, IN2, OUT> {
        private volatile boolean started;

        TestSelectiveReadingTask(Environment environment) throws Exception {
            super(environment);
            this.started = false;
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (!this.started) {
                synchronized (this) {
                    wait();
                }
            }
            super.processInput(controller);
        }

        public void startProcessing() {
            this.started = true;
            synchronized (this) {
                notifyAll();
            }
        }
    }

    private static String elementToString(Object obj) {
        return obj instanceof StreamRecord ? ((StreamRecord) obj).getValue().toString() : obj.toString();
    }

    @Test
    public void testAnyOrderedReading() throws Exception {
        ConcurrentLinkedQueue<Object> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-1"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 1"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-2"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 2"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-3"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 3"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 4"));
        testBase(new TestAnyModeReadingStreamOperator("Operator0"), true, concurrentLinkedQueue, true);
    }

    @Test
    public void testAnyUnorderedReading() throws Exception {
        ConcurrentLinkedQueue<Object> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-1"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 1"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-2"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 2"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-3"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 3"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 4"));
        testBase(new TestAnyModeReadingStreamOperator("Operator0"), false, concurrentLinkedQueue, false);
    }

    @Test
    public void testSequentialReading() throws Exception {
        ConcurrentLinkedQueue<Object> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-1"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-2"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-3"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 1"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 2"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 3"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 4"));
        testBase(new TestSequentialReadingStreamOperator("Operator0"), false, concurrentLinkedQueue, true);
    }

    @Test
    public void testSpecialRuleReading() throws Exception {
        ConcurrentLinkedQueue<Object> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-1"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-2"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 1"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 2"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-1]: Hello-3"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 3"));
        concurrentLinkedQueue.add(new StreamRecord("[Operator0-2]: 4"));
        testBase(new SpecialRuleReadingStreamOperator("Operator0", 3, 4, 2), false, concurrentLinkedQueue, true);
    }

    @Test
    public void testReadFinishedInput() throws Exception {
        try {
            testBase(new TestReadFinishedInputStreamOperator(), false, new ConcurrentLinkedQueue<>(), true);
            Assert.fail("should throw an IOException");
        } catch (Exception e) {
            if (!ExceptionUtils.findThrowableWithMessage(e, "all selected inputs are already finished").isPresent()) {
                throw e;
            }
        }
    }

    private void testBase(TwoInputStreamOperator<String, Integer, String> twoInputStreamOperator, boolean z, ConcurrentLinkedQueue<Object> concurrentLinkedQueue, boolean z2) throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(TestSelectiveReadingTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = twoInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(twoInputStreamOperator);
        streamConfig.setOperatorID(new OperatorID());
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        boolean z3 = false;
        if (!z) {
            twoInputStreamTaskTestHarness.mo139getTask().startProcessing();
            z3 = true;
        }
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-1"), 0, 0);
        if (!z) {
            twoInputStreamTaskTestHarness.waitForInputProcessing();
        }
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-2"), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-3"), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(1), 1, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(2), 1, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(3), 1, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(4), 1, 0);
        twoInputStreamTaskTestHarness.endInput();
        if (!z3) {
            twoInputStreamTaskTestHarness.mo139getTask().startProcessing();
        }
        twoInputStreamTaskTestHarness.waitForTaskCompletion(10000L);
        LinkedBlockingQueue<Object> output = twoInputStreamTaskTestHarness.getOutput();
        if (z2) {
            TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, output);
            return;
        }
        String[] strArr = (String[]) concurrentLinkedQueue.stream().map(StreamTaskSelectiveReadingTest::elementToString).toArray(i -> {
            return new String[i];
        });
        Arrays.sort(strArr);
        String[] strArr2 = (String[]) output.stream().map(StreamTaskSelectiveReadingTest::elementToString).toArray(i2 -> {
            return new String[i2];
        });
        Arrays.sort(strArr2);
        Assert.assertArrayEquals("Output was not correct.", strArr, strArr2);
    }
}
