/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestBoundedOneInputStreamOperator;
import org.apache.flink.streaming.util.TestBoundedMultipleInputOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyCollection;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Test;

public class MultipleInputStreamTaskTest {
    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).build();){
            long initialTime = 0L;
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L), 0);
            expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
            testHarness.processElement(new StreamRecord((Object)1337, initialTime + 2L), 1);
            expectedOutput.add(new StreamRecord((Object)"1337", initialTime + 2L));
            testHarness.processElement(new StreamRecord((Object)42.44, initialTime + 3L), 2);
            expectedOutput.add(new StreamRecord((Object)"42.44", initialTime + 3L));
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).build();){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            long initialTime = 0L;
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 1);
            expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
            testHarness.processElement(new StreamRecord((Object)11, initialTime), 1, 1);
            testHarness.processElement(new StreamRecord((Object)1.0, initialTime), 2, 0);
            expectedOutput.add(new StreamRecord((Object)"11", initialTime));
            expectedOutput.add(new StreamRecord((Object)"1.0", initialTime));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            expectedOutput.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).build();){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            long initialTime = 0L;
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            testHarness.processElement(new StreamRecord((Object)"Witam-0-1", initialTime), 0, 1);
            testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
            testHarness.processElement(new StreamRecord((Object)1.0, initialTime), 2, 1);
            expectedOutput.add(new StreamRecord((Object)"Witam-0-1", initialTime));
            expectedOutput.add(new StreamRecord((Object)"42", initialTime));
            expectedOutput.add(new StreamRecord((Object)"1.0", initialTime));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            expectedOutput.add(new CancelCheckpointMarker(0L));
            expectedOutput.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            testHarness.waitForTaskCompletion();
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testOperatorMetricReuse() throws Exception {
        UnregisteredMetricGroups.UnregisteredTaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup(){

            public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
                return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, (TaskMetricGroup)this, operatorID, name);
            }
        };
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>)new DuplicatingOperatorFactory()).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish().setTaskMetricGroup((TaskMetricGroup)taskMetricGroup).build();){
            int x;
            Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
            Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
            int numRecords1 = 5;
            int numRecords2 = 3;
            int numRecords3 = 2;
            for (x = 0; x < numRecords1; ++x) {
                testHarness.processElement(new StreamRecord((Object)"hello"), 0, 0);
            }
            for (x = 0; x < numRecords2; ++x) {
                testHarness.processElement(new StreamRecord((Object)"hello"), 1, 0);
            }
            for (x = 0; x < numRecords3; ++x) {
                testHarness.processElement(new StreamRecord((Object)"hello"), 2, 0);
            }
            int totalRecords = numRecords1 + numRecords2 + numRecords3;
            Assert.assertEquals((long)totalRecords, (long)numRecordsInCounter.getCount());
            Assert.assertEquals((long)(totalRecords * 2 * 2 * 2), (long)numRecordsOutCounter.getCount());
            testHarness.waitForTaskCompletion();
        }
    }

    @Test
    public void testClosingAllOperatorsOnChainProperly() throws Exception {
        ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>)new TestBoundedMultipleInputOperatorFactory()).chain(new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish().build();){
            testHarness.processElement(new StreamRecord((Object)"Hello-1"), 0);
            testHarness.endInput(0);
            testHarness.processWhileAvailable();
            testHarness.processElement(new StreamRecord((Object)"Hello-2"), 1);
            testHarness.processElement(new StreamRecord((Object)"Hello-3"), 2);
            testHarness.endInput(1);
            testHarness.processWhileAvailable();
            testHarness.endInput(2);
            testHarness.processWhileAvailable();
            Assert.assertEquals((Object)true, (Object)testHarness.getStreamTask().getInputOutputJointFuture(InputStatus.NOTHING_AVAILABLE).isDone());
            testHarness.waitForTaskCompletion();
        }
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: Hello-1"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-1]: End of input"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: Hello-2"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-3]: Hello-3"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-2]: End of input"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0-3]: End of input"));
        expectedOutput.add(new StreamRecord((Object)"[Operator0]: Bye"));
        expectedOutput.add(new StreamRecord((Object)"[Operator1]: End of input"));
        expectedOutput.add(new StreamRecord((Object)"[Operator1]: Bye"));
        MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
    }

    @Test
    public void testInputFairness() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).build();){
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            testHarness.setAutoProcess(false);
            testHarness.processElement(new StreamRecord((Object)"0"), 0);
            testHarness.processElement(new StreamRecord((Object)"1"), 0);
            testHarness.processElement(new StreamRecord((Object)"2"), 0);
            testHarness.processElement(new StreamRecord((Object)"3"), 0);
            testHarness.processElement(new StreamRecord((Object)"0"), 2);
            testHarness.processElement(new StreamRecord((Object)"1"), 2);
            testHarness.processWhileAvailable();
            expectedOutput.add(new StreamRecord((Object)"0"));
            expectedOutput.add(new StreamRecord((Object)"0"));
            expectedOutput.add(new StreamRecord((Object)"1"));
            expectedOutput.add(new StreamRecord((Object)"1"));
            expectedOutput.add(new StreamRecord((Object)"2"));
            expectedOutput.add(new StreamRecord((Object)"3"));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testWatermark() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).build();){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            long initialTime = 0L;
            testHarness.processElement(new Watermark(initialTime), 0, 0);
            testHarness.processElement(new Watermark(initialTime), 0, 1);
            testHarness.processElement(new Watermark(initialTime), 1, 0);
            testHarness.processElement(new Watermark(initialTime), 1, 1);
            testHarness.processElement(new Watermark(initialTime), 2, 0);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)IsEmptyCollection.empty());
            testHarness.processElement(new Watermark(initialTime), 2, 1);
            expectedOutput.add(new Watermark(initialTime));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new StreamRecord((Object)"Hello", initialTime), 0, 0);
            testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
            expectedOutput.add(new StreamRecord((Object)"Hello", initialTime));
            expectedOutput.add(new StreamRecord((Object)"42", initialTime));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new Watermark(initialTime + 4L), 0, 0);
            testHarness.processElement(new Watermark(initialTime + 3L), 0, 1);
            testHarness.processElement(new Watermark(initialTime + 3L), 1, 0);
            testHarness.processElement(new Watermark(initialTime + 4L), 1, 1);
            testHarness.processElement(new Watermark(initialTime + 3L), 2, 0);
            testHarness.processElement(new Watermark(initialTime + 2L), 2, 1);
            expectedOutput.add(new Watermark(initialTime + 2L));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new Watermark(initialTime + 4L), 2, 1);
            expectedOutput.add(new Watermark(initialTime + 3L));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new Watermark(initialTime + 4L), 0, 1);
            testHarness.processElement(new Watermark(initialTime + 4L), 1, 0);
            testHarness.processElement(new Watermark(initialTime + 4L), 2, 0);
            expectedOutput.add(new Watermark(initialTime + 4L));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
            Assert.assertEquals((long)2L, (long)resultElements.size());
        }
    }

    @Test
    public void testWatermarkAndStreamStatusForwarding() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).build();){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            long initialTime = 0L;
            testHarness.processElement(StreamStatus.IDLE, 0, 1);
            testHarness.processElement(StreamStatus.IDLE, 1, 1);
            testHarness.processElement(StreamStatus.IDLE, 2, 0);
            testHarness.processElement(new Watermark(initialTime + 6L), 0, 0);
            testHarness.processElement(new Watermark(initialTime + 6L), 1, 0);
            testHarness.processElement(new Watermark(initialTime + 5L), 2, 1);
            testHarness.processElement(StreamStatus.IDLE, 2, 1);
            expectedOutput.add(new Watermark(initialTime + 5L));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(StreamStatus.IDLE, 0, 0);
            testHarness.processElement(StreamStatus.IDLE, 1, 0);
            expectedOutput.add(StreamStatus.IDLE);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(StreamStatus.ACTIVE, 1, 0);
            testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
            expectedOutput.add(StreamStatus.ACTIVE);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        final OperatorID headOperatorId = new OperatorID();
        final OperatorID chainedOperatorId = new OperatorID();
        final InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup(){

            public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
                if (id.equals((Object)headOperatorId)) {
                    return headOperatorMetricGroup;
                }
                if (id.equals((Object)chainedOperatorId)) {
                    return chainedOperatorMetricGroup;
                }
                return super.getOrAddOperator(id, name);
            }
        };
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOperatorChain(headOperatorId, (StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).chain(chainedOperatorId, new OneInputStreamTaskTest.WatermarkMetricOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish().setTaskMetricGroup((TaskMetricGroup)taskMetricGroup).build();){
            Gauge taskInputWatermarkGauge = (Gauge)taskMetricGroup.get("currentInputWatermark");
            Gauge headInput1WatermarkGauge = (Gauge)headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)1));
            Gauge headInput2WatermarkGauge = (Gauge)headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)2));
            Gauge headInput3WatermarkGauge = (Gauge)headOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)3));
            Gauge headInputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentInputWatermark");
            Gauge headOutputWatermarkGauge = (Gauge)headOperatorMetricGroup.get("currentOutputWatermark");
            Gauge chainedInputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentInputWatermark");
            Gauge chainedOutputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentOutputWatermark");
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            testHarness.processElement(new Watermark(1L), 0);
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)headInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            testHarness.processElement(new Watermark(2L), 1);
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)headInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)headInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)headOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            testHarness.processElement(new Watermark(2L), 2);
            Assert.assertEquals((long)1L, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)headInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)headInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)headInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)headInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)headOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            testHarness.processElement(new Watermark(4L), 0);
            testHarness.processElement(new Watermark(3L), 1);
            Assert.assertEquals((long)2L, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)headInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)4L, (long)((Long)headInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)3L, (long)((Long)headInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)headInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)headOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)4L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    @Test
    public void testCheckpointBarrierMetrics() throws Exception {
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        StreamTaskTestHarness.TestTaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).setTaskMetricGroup(taskMetricGroup).build();){
            MatcherAssert.assertThat(metrics, (Matcher)IsMapContaining.hasKey((Object)"checkpointAlignmentTime"));
            MatcherAssert.assertThat(metrics, (Matcher)IsMapContaining.hasKey((Object)"checkpointStartDelayNanos"));
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    @Test
    public void testLatencyMarker() throws Exception {
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        StreamTaskTestHarness.TestTaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory()).setTaskMetricGroup(taskMetricGroup).build();){
            ArrayDeque<LatencyMarker> expectedOutput = new ArrayDeque<LatencyMarker>();
            OperatorID sourceId = new OperatorID();
            LatencyMarker latencyMarker = new LatencyMarker(42L, sourceId, 0);
            testHarness.processElement(latencyMarker);
            expectedOutput.add(latencyMarker);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    private static class MapToStringMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private MapToStringMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new MapToStringMultipleInputOperator(parameters));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return MapToStringMultipleInputOperator.class;
        }
    }

    private static class DuplicatingOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private DuplicatingOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new DuplicatingOperator(parameters));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return DuplicatingOperator.class;
        }
    }

    private static class TestBoundedMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private TestBoundedMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new TestBoundedMultipleInputOperator("Operator0", parameters));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return TestBoundedMultipleInputOperator.class;
        }
    }

    private static class MapToStringMultipleInputOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String> {
        private static final long serialVersionUID = 1L;
        private boolean openCalled;
        private boolean closeCalled;

        public MapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 3);
        }

        public void open() throws Exception {
            super.open();
            if (this.closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            this.closeCalled = true;
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new MapToStringInput(this, 1), new MapToStringInput(this, 2), new MapToStringInput(this, 3)});
        }

        public boolean wasCloseCalled() {
            return this.closeCalled;
        }

        public class MapToStringInput<T>
        extends AbstractInput<T, String> {
            public MapToStringInput(AbstractStreamOperatorV2<String> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<T> element) throws Exception {
                if (!MapToStringMultipleInputOperator.this.openCalled) {
                    Assert.fail((String)"Open was not called before run.");
                }
                if (element.hasTimestamp()) {
                    this.output.collect((Object)new StreamRecord((Object)element.getValue().toString(), element.getTimestamp()));
                } else {
                    this.output.collect((Object)new StreamRecord((Object)element.getValue().toString()));
                }
            }
        }
    }

    static class DuplicatingOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String> {
        public DuplicatingOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new DuplicatingInput(this, 1), new DuplicatingInput(this, 2), new DuplicatingInput(this, 3)});
        }

        class DuplicatingInput
        extends AbstractInput<String, String> {
            public DuplicatingInput(AbstractStreamOperatorV2<String> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<String> element) throws Exception {
                this.output.collect(element);
                this.output.collect(element);
            }
        }
    }
}

