package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestAnyModeMultipleInputStreamOperator;
import org.apache.flink.streaming.util.TestSequentialMultipleInputStreamOperator;
import org.apache.flink.util.ExceptionUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.class */
public class StreamTaskMultipleInputSelectiveReadingTest {
    private static final StreamRecord<String>[] INPUT1 = {new StreamRecord<>("Hello-1"), new StreamRecord<>("Hello-2"), new StreamRecord<>("Hello-3")};
    private static final StreamRecord<Integer>[] INPUT2 = {new StreamRecord<>(1), new StreamRecord<>(2), new StreamRecord<>(3), new StreamRecord<>(4)};

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest$SpecialRuleReadingStreamOperator.class */
    private static class SpecialRuleReadingStreamOperator extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String>, InputSelectable, BoundedMultiInput {
        private final int input1Records;
        private final int input2Records;
        private final int maxContinuousReadingRecords;
        private int input1ReadingRecords;
        private int input2ReadingRecords;
        private int continuousReadingRecords;
        private InputSelection inputSelection;

        SpecialRuleReadingStreamOperator(StreamOperatorParameters<String> streamOperatorParameters, int i, int i2, int i3) {
            super(streamOperatorParameters, 2);
            this.inputSelection = InputSelection.FIRST;
            this.input1Records = i;
            this.input2Records = i2;
            this.maxContinuousReadingRecords = i3;
        }

        public InputSelection nextSelection() {
            return this.inputSelection;
        }

        public void endInput(int i) {
            this.inputSelection = i == 1 ? InputSelection.SECOND : InputSelection.FIRST;
        }

        public List<Input> getInputs() {
            return Arrays.asList(new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 1) { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskMultipleInputSelectiveReadingTest.SpecialRuleReadingStreamOperator.1
                @Override // org.apache.flink.streaming.util.TestAnyModeMultipleInputStreamOperator.ToStringInput
                public void processElement(StreamRecord streamRecord) {
                    super.processElement(streamRecord);
                    SpecialRuleReadingStreamOperator.access$208(SpecialRuleReadingStreamOperator.this);
                    SpecialRuleReadingStreamOperator.access$308(SpecialRuleReadingStreamOperator.this);
                    if (SpecialRuleReadingStreamOperator.this.continuousReadingRecords == SpecialRuleReadingStreamOperator.this.maxContinuousReadingRecords) {
                        SpecialRuleReadingStreamOperator.this.continuousReadingRecords = 0;
                        if (SpecialRuleReadingStreamOperator.this.input2ReadingRecords < SpecialRuleReadingStreamOperator.this.input2Records) {
                            SpecialRuleReadingStreamOperator.this.inputSelection = InputSelection.SECOND;
                            return;
                        }
                    }
                    SpecialRuleReadingStreamOperator.this.inputSelection = InputSelection.FIRST;
                }
            }, new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 2) { // from class: org.apache.flink.streaming.runtime.tasks.StreamTaskMultipleInputSelectiveReadingTest.SpecialRuleReadingStreamOperator.2
                @Override // org.apache.flink.streaming.util.TestAnyModeMultipleInputStreamOperator.ToStringInput
                public void processElement(StreamRecord streamRecord) {
                    super.processElement(streamRecord);
                    SpecialRuleReadingStreamOperator.access$508(SpecialRuleReadingStreamOperator.this);
                    SpecialRuleReadingStreamOperator.access$308(SpecialRuleReadingStreamOperator.this);
                    if (SpecialRuleReadingStreamOperator.this.continuousReadingRecords == SpecialRuleReadingStreamOperator.this.maxContinuousReadingRecords) {
                        SpecialRuleReadingStreamOperator.this.continuousReadingRecords = 0;
                        if (SpecialRuleReadingStreamOperator.this.input1ReadingRecords < SpecialRuleReadingStreamOperator.this.input1Records) {
                            SpecialRuleReadingStreamOperator.this.inputSelection = InputSelection.FIRST;
                            return;
                        }
                    }
                    SpecialRuleReadingStreamOperator.this.inputSelection = InputSelection.SECOND;
                }
            });
        }

        static /* synthetic */ int access$208(SpecialRuleReadingStreamOperator specialRuleReadingStreamOperator) {
            int i = specialRuleReadingStreamOperator.input1ReadingRecords;
            specialRuleReadingStreamOperator.input1ReadingRecords = i + 1;
            return i;
        }

        static /* synthetic */ int access$308(SpecialRuleReadingStreamOperator specialRuleReadingStreamOperator) {
            int i = specialRuleReadingStreamOperator.continuousReadingRecords;
            specialRuleReadingStreamOperator.continuousReadingRecords = i + 1;
            return i;
        }

        static /* synthetic */ int access$508(SpecialRuleReadingStreamOperator specialRuleReadingStreamOperator) {
            int i = specialRuleReadingStreamOperator.input2ReadingRecords;
            specialRuleReadingStreamOperator.input2ReadingRecords = i + 1;
            return i;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest$SpecialRuleReadingStreamOperatorFactory.class */
    private static class SpecialRuleReadingStreamOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private final int input1Records;
        private final int input2Records;
        private final int maxContinuousReadingRecords;

        public SpecialRuleReadingStreamOperatorFactory(int i, int i2, int i3) {
            this.input1Records = i;
            this.input2Records = i2;
            this.maxContinuousReadingRecords = i3;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new SpecialRuleReadingStreamOperator(streamOperatorParameters, this.input1Records, this.input2Records, this.maxContinuousReadingRecords);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SpecialRuleReadingStreamOperator.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest$TestInputStarvationMultipleInputOperator.class */
    private static class TestInputStarvationMultipleInputOperator extends AbstractStreamOperatorV2<String> implements MultipleInputStreamOperator<String>, InputSelectable {
        public TestInputStarvationMultipleInputOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            super(streamOperatorParameters, 3);
        }

        public InputSelection nextSelection() {
            return new InputSelection.Builder().select(2).select(3).build();
        }

        public List<Input> getInputs() {
            return Arrays.asList(new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 1), new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 2), new TestAnyModeMultipleInputStreamOperator.ToStringInput(this, 3));
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest$TestInputStarvationMultipleInputOperatorFactory.class */
    private static class TestInputStarvationMultipleInputOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private TestInputStarvationMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new TestInputStarvationMultipleInputOperator(streamOperatorParameters);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return TestInputStarvationMultipleInputOperator.class;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest$TestReadFinishedInputStreamOperator.class */
    private static class TestReadFinishedInputStreamOperator extends TestAnyModeMultipleInputStreamOperator {
        TestReadFinishedInputStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            super(streamOperatorParameters);
        }

        @Override // org.apache.flink.streaming.util.TestAnyModeMultipleInputStreamOperator
        public InputSelection nextSelection() {
            return InputSelection.FIRST;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest$TestReadFinishedInputStreamOperatorFactory.class */
    private static class TestReadFinishedInputStreamOperatorFactory extends AbstractStreamOperatorFactory<String> {
        private TestReadFinishedInputStreamOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> streamOperatorParameters) {
            return new TestReadFinishedInputStreamOperator(streamOperatorParameters);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return TestReadFinishedInputStreamOperator.class;
        }
    }

    @Test
    public void testAnyOrderedReading() throws Exception {
        ArrayDeque<Object> arrayDeque = new ArrayDeque<>();
        arrayDeque.add(new StreamRecord("[1]: Hello-1"));
        arrayDeque.add(new StreamRecord("[2]: 1"));
        arrayDeque.add(new StreamRecord("[1]: Hello-2"));
        arrayDeque.add(new StreamRecord("[2]: 2"));
        arrayDeque.add(new StreamRecord("[1]: Hello-3"));
        arrayDeque.add(new StreamRecord("[2]: 3"));
        arrayDeque.add(new StreamRecord("[2]: 4"));
        testInputSelection(new TestAnyModeMultipleInputStreamOperator.Factory(), false, arrayDeque, true);
    }

    @Test
    public void testAnyUnorderedReading() throws Exception {
        ArrayDeque<Object> arrayDeque = new ArrayDeque<>();
        arrayDeque.add(new StreamRecord("[1]: Hello-1"));
        arrayDeque.add(new StreamRecord("[2]: 1"));
        arrayDeque.add(new StreamRecord("[1]: Hello-2"));
        arrayDeque.add(new StreamRecord("[2]: 2"));
        arrayDeque.add(new StreamRecord("[1]: Hello-3"));
        arrayDeque.add(new StreamRecord("[2]: 3"));
        arrayDeque.add(new StreamRecord("[2]: 4"));
        testInputSelection(new TestAnyModeMultipleInputStreamOperator.Factory(), true, arrayDeque, false);
    }

    @Test
    public void testSequentialReading() throws Exception {
        ArrayDeque<Object> arrayDeque = new ArrayDeque<>();
        arrayDeque.add(new StreamRecord("[1]: Hello-1"));
        arrayDeque.add(new StreamRecord("[1]: Hello-2"));
        arrayDeque.add(new StreamRecord("[1]: Hello-3"));
        arrayDeque.add(new StreamRecord("[2]: 1"));
        arrayDeque.add(new StreamRecord("[2]: 2"));
        arrayDeque.add(new StreamRecord("[2]: 3"));
        arrayDeque.add(new StreamRecord("[2]: 4"));
        testInputSelection(new TestSequentialMultipleInputStreamOperator.Factory(), true, arrayDeque, true);
    }

    @Test
    public void testSpecialRuleReading() throws Exception {
        ArrayDeque<Object> arrayDeque = new ArrayDeque<>();
        arrayDeque.add(new StreamRecord("[1]: Hello-1"));
        arrayDeque.add(new StreamRecord("[1]: Hello-2"));
        arrayDeque.add(new StreamRecord("[2]: 1"));
        arrayDeque.add(new StreamRecord("[2]: 2"));
        arrayDeque.add(new StreamRecord("[1]: Hello-3"));
        arrayDeque.add(new StreamRecord("[2]: 3"));
        arrayDeque.add(new StreamRecord("[2]: 4"));
        testInputSelection(new SpecialRuleReadingStreamOperatorFactory(3, 4, 2), true, arrayDeque, true);
    }

    @Test
    public void testReadFinishedInput() throws Exception {
        try {
            testInputSelection(new TestReadFinishedInputStreamOperatorFactory(), true, new ArrayDeque<>(), true);
            Assert.fail("should throw an IOException");
        } catch (Exception e) {
            if (!ExceptionUtils.findThrowableWithMessage(e, "Can not make a progress: all selected inputs are already finished").isPresent()) {
                throw e;
            }
        }
    }

    private void testInputSelection(StreamOperatorFactory<String> streamOperatorFactory, boolean z, ArrayDeque<Object> arrayDeque, boolean z2) throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain(streamOperatorFactory).build();
        Throwable th = null;
        try {
            build.setAutoProcess(z);
            for (StreamRecord<String> streamRecord : INPUT1) {
                build.processElement(streamRecord, 0);
            }
            for (StreamRecord<Integer> streamRecord2 : INPUT2) {
                build.processElement(streamRecord2, 1);
            }
            build.endInput();
            if (!z) {
                build.process();
            }
            build.waitForTaskCompletion();
            if (z2) {
                MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            } else {
                MatcherAssert.assertThat(build.getOutput(), Matchers.containsInAnyOrder(arrayDeque.toArray()));
            }
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testInputStarvation() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>) new TestInputStarvationMultipleInputOperatorFactory()).build();
        Throwable th = null;
        try {
            ArrayDeque arrayDeque = new ArrayDeque();
            build.setAutoProcess(false);
            build.processSingleStep();
            Assert.assertTrue(build.getOutput().isEmpty());
            build.processElement(new StreamRecord("NOT_SELECTED"), 0);
            build.processElement(new StreamRecord("1"), 1);
            build.processElement(new StreamRecord("2"), 1);
            build.processElement(new StreamRecord("3"), 1);
            build.processElement(new StreamRecord("4"), 1);
            build.processSingleStep();
            arrayDeque.add(new StreamRecord("[2]: 1"));
            build.processSingleStep();
            arrayDeque.add(new StreamRecord("[2]: 2"));
            MatcherAssert.assertThat(build.getOutput(), Matchers.contains(arrayDeque.toArray()));
            build.processElement(new StreamRecord("1"), 2);
            build.processSingleStep();
            build.processSingleStep();
            arrayDeque.add(new StreamRecord("[3]: 1"));
            arrayDeque.add(new StreamRecord("[2]: 3"));
            MatcherAssert.assertThat(build.getOutput(), Matchers.containsInAnyOrder(arrayDeque.toArray()));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }
}
