/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.TestBoundedOneInputStreamOperator;
import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TwoInputStreamTaskTest {
    @Parameterized.Parameter
    public boolean isInputSelectable;

    @Parameterized.Parameters(name="isInputSelectable = {0}")
    public static List<Boolean> parameters() {
        return Arrays.asList(Boolean.FALSE, Boolean.TRUE);
    }

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(this.isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        AnyReadingCoStreamMap coMapOperator = this.isInputSelectable ? new AnyReadingCoStreamMap(new TestOpenCloseMapFunction()) : new CoStreamMap((CoMapFunction)new TestOpenCloseMapFunction());
        streamConfig.setStreamOperator(coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        long initialTime = 0L;
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L), 0, 0);
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)1337, initialTime + 2L), 1, 0);
        expectedOutput.add(new StreamRecord((Object)"1337", initialTime + 2L));
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        Assert.assertTrue((String)"RichFunction methods were not called.", (boolean)TestOpenCloseMapFunction.closeCalled);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testWatermarkAndStreamStatusForwarding() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(this.isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        AnyReadingCoStreamMap<String, Integer, String> coMapOperator = this.isInputSelectable ? new AnyReadingCoStreamMap<String, Integer, String>(new IdentityMap()) : new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator(coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new Watermark(initialTime), 0, 0);
        testHarness.processElement(new Watermark(initialTime), 0, 1);
        testHarness.processElement(new Watermark(initialTime), 1, 0);
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime));
        expectedOutput.add(new StreamRecord((Object)"42", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 3L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 3L), 1, 0);
        testHarness.processElement(new Watermark(initialTime + 2L), 1, 1);
        expectedOutput.add(new Watermark(initialTime + 2L));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 3L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 4L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamStatus.IDLE, 0, 1);
        testHarness.processElement(StreamStatus.IDLE, 1, 0);
        testHarness.processElement(new Watermark(initialTime + 6L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 5L), 1, 1);
        testHarness.processElement(StreamStatus.IDLE, 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 5L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamStatus.IDLE, 0, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(StreamStatus.IDLE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
        testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(StreamStatus.ACTIVE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)2L, (long)resultElements.size());
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        if (this.isInputSelectable) {
            return;
        }
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 1);
        expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)11, initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)111, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"11", initialTime));
        expectedOutput.add(new StreamRecord((Object)"111", initialTime));
        testHarness.waitForInputProcessing();
        for (int i = 0; i < 20 && testHarness.getOutput().size() < expectedOutput.size(); ++i) {
            Thread.sleep(100L);
        }
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        expectedOutput.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        expectedOutput.add(new StreamRecord((Object)"Hello-0-0", initialTime));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)4L, (long)resultElements.size());
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        if (this.isInputSelectable) {
            return;
        }
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)1337, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"42", initialTime));
        expectedOutput.add(new StreamRecord((Object)"1337", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        expectedOutput.add(new CancelCheckpointMarker(0L));
        expectedOutput.add(new StreamRecord((Object)"Hello-0-0", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
        expectedOutput.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testOperatorMetricReuse() throws Exception {
        int x;
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(this.isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)new DuplicatingOperator()).chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        UnregisteredMetricGroups.UnregisteredTaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup(){

            public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
                return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, (TaskMetricGroup)this, operatorID, name);
            }
        };
        StreamMockEnvironment env = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)new TestTaskStateManager(), (TaskMetricGroup)taskMetricGroup){
            final /* synthetic */ TaskMetricGroup val$taskMetricGroup;
            {
                this.val$taskMetricGroup = taskMetricGroup;
                super(jobConfig, taskConfig, memorySize, inputSplitProvider, bufferSize, taskStateManager);
            }

            @Override
            public TaskMetricGroup getMetricGroup() {
                return this.val$taskMetricGroup;
            }
        };
        Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
        testHarness.invoke(env);
        testHarness.waitForTaskRunning();
        int numRecords1 = 5;
        int numRecords2 = 3;
        for (x = 0; x < 5; ++x) {
            testHarness.processElement(new StreamRecord((Object)"hello"), 0, 0);
        }
        for (x = 0; x < 3; ++x) {
            testHarness.processElement(new StreamRecord((Object)"hello"), 1, 0);
        }
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)8L, (long)numRecordsInCounter.getCount());
        Assert.assertEquals((long)64L, (long)numRecordsOutCounter.getCount());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(this.isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        AnyReadingCoStreamMap<String, Integer, String> headOperator = this.isInputSelectable ? new AnyReadingCoStreamMap<String, Integer, String>(new IdentityMap()) : new CoStreamMap((CoMapFunction)new IdentityMap());
        final OperatorID headOperatorId = new OperatorID();
        OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator = new OneInputStreamTaskTest.WatermarkMetricOperator();
        final OperatorID chainedOperatorId = new OperatorID();
        testHarness.setupOperatorChain(headOperatorId, (StreamOperator<?>)headOperator).chain(chainedOperatorId, chainedOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        final InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup(){

            public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
                if (id.equals((Object)headOperatorId)) {
                    return headOperatorMetricGroup;
                }
                if (id.equals((Object)chainedOperatorId)) {
                    return chainedOperatorMetricGroup;
                }
                return super.getOrAddOperator(id, name);
            }
        };
        StreamMockEnvironment env = new StreamMockEnvironment(testHarness.jobConfig, testHarness.taskConfig, testHarness.memorySize, new MockInputSplitProvider(), testHarness.bufferSize, (TaskStateManager)new TestTaskStateManager()){

            @Override
            public TaskMetricGroup getMetricGroup() {
                return taskMetricGroup;
            }
        };
        testHarness.invoke(env);
        testHarness.waitForTaskRunning();
        Gauge taskInputWatermarkGauge = (Gauge)taskMetricGroup.get("currentInputWatermark");
        Gauge headInput1WatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInput1Watermark");
        Gauge headInput2WatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInput2Watermark");
        Gauge headInputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInputWatermark");
        Gauge headOutputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentOutputWatermark");
        Gauge chainedInputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentInputWatermark");
        Gauge chainedOutputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentOutputWatermark");
        Assert.assertEquals((String)"A metric was registered multiple times.", (long)7L, (long)new HashSet<Gauge>(Arrays.asList(taskInputWatermarkGauge, headInput1WatermarkGauge, headInput2WatermarkGauge, headInputWatermarkGauge, headOutputWatermarkGauge, chainedInputWatermarkGauge, chainedOutputWatermarkGauge)).size());
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput1WatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput2WatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.processElement(new Watermark(1L), 0, 0);
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headInput1WatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput2WatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.processElement(new Watermark(2L), 1, 0);
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)1L, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headInput1WatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headInput2WatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)1L, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.processElement(new Watermark(3L), 0, 0);
        testHarness.waitForInputProcessing();
        Assert.assertEquals((long)2L, (long)((Long)taskInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)3L, (long)((Long)headInput1WatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headInput2WatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)headOutputWatermarkGauge.getValue()));
        Assert.assertEquals((long)2L, (long)((Long)chainedInputWatermarkGauge.getValue()));
        Assert.assertEquals((long)4L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testHandlingEndOfInput() throws Exception {
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(this.isInputSelectable ? TwoInputSelectableStreamTask::new : TwoInputStreamTask::new, 3, 2, new int[]{1, 2, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOperatorChain(new OperatorID(), (StreamOperator<?>)(this.isInputSelectable ? new TestBoundedAndSelectableTwoInputOperator("Operator0") : new TestBoundedTwoInputOperator("Operator0"))).chain(new OperatorID(), new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        TestBoundedTwoInputOperator headOperator = (TestBoundedTwoInputOperator)testHarness.getTask().headOperator;
        testHarness.processElement(new StreamRecord((Object)"Hello-1"), 0, 0);
        testHarness.endInput(0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-2"), 0, 1);
        testHarness.endInput(0, 1);
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)"Hello-3"), 1, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-4"), 1, 1);
        testHarness.endInput(1, 0);
        testHarness.endInput(1, 1);
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)"Hello-5"), 2, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-6"), 2, 1);
        testHarness.endInput(2, 0);
        testHarness.endInput(2, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Bye"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: Hello-4"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: Hello-5"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: Hello-6"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: Bye"));
        expectedOutput.add(new StreamRecord((Object)"[Operator1]: Bye"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    private static class TestBoundedAndSelectableTwoInputOperator
    extends TestBoundedTwoInputOperator
    implements InputSelectable {
        public TestBoundedAndSelectableTwoInputOperator(String name) {
            super(name);
        }

        public InputSelection nextSelection() {
            return InputSelection.ALL;
        }
    }

    private static class TestBoundedTwoInputOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<String, String, String>,
    BoundedMultiInput {
        private static final long serialVersionUID = 1L;
        private final String name;

        public TestBoundedTwoInputOperator(String name) {
            this.name = name;
        }

        public void processElement1(StreamRecord<String> element) {
            this.output.collect((Object)element.replace((Object)("[" + this.name + "-1]: " + (String)element.getValue())));
        }

        public void processElement2(StreamRecord<String> element) {
            this.output.collect((Object)element.replace((Object)("[" + this.name + "-2]: " + (String)element.getValue())));
        }

        public void endInput(int inputId) {
            this.output.collect((Object)new StreamRecord((Object)("[" + this.name + "-" + inputId + "]: Bye")));
        }
    }

    private static class AnyReadingCoStreamMap<IN1, IN2, OUT>
    extends CoStreamMap<IN1, IN2, OUT>
    implements InputSelectable {
        public AnyReadingCoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
            super(mapper);
        }

        public InputSelection nextSelection() {
            return InputSelection.ALL;
        }
    }

    private static class IdentityMap
    implements CoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;

        private IdentityMap() {
        }

        public String map1(String value) throws Exception {
            return value;
        }

        public String map2(Integer value) throws Exception {
            return value.toString();
        }
    }

    private static class TestOpenCloseMapFunction
    extends RichCoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        TestOpenCloseMapFunction() {
            openCalled = false;
            closeCalled = false;
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map1(String value) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value;
        }

        public String map2(Integer value) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value.toString();
        }
    }

    static class DuplicatingOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<String, String, String>,
    InputSelectable {
        DuplicatingOperator() {
        }

        public void processElement1(StreamRecord<String> element) {
            this.output.collect(element);
            this.output.collect(element);
        }

        public void processElement2(StreamRecord<String> element) {
            this.output.collect(element);
            this.output.collect(element);
        }

        public InputSelection nextSelection() {
            return InputSelection.ALL;
        }
    }
}

