package org.apache.flink.streaming.runtime.tasks;

import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.class */
public class TwoInputStreamTaskTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest$DuplicatingOperator.class */
    static class DuplicatingOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String> {
        DuplicatingOperator() {
        }

        public void processElement1(StreamRecord<String> streamRecord) {
            this.output.collect(streamRecord);
            this.output.collect(streamRecord);
        }

        public void processElement2(StreamRecord<String> streamRecord) {
            this.output.collect(streamRecord);
            this.output.collect(streamRecord);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest$IdentityMap.class */
    private static class IdentityMap implements CoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1;

        private IdentityMap() {
        }

        public String map1(String str) throws Exception {
            return str;
        }

        public String map2(Integer num) throws Exception {
            return num.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest$TestOpenCloseMapFunction.class */
    private static class TestOpenCloseMapFunction extends RichCoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseMapFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map1(String str) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return str;
        }

        public String map2(Integer num) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return num.toString();
        }
    }

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = twoInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new CoStreamMap(new TestOpenCloseMapFunction()));
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello", 0 + 1), 0, 0);
        concurrentLinkedQueue.add(new StreamRecord("Hello", 0 + 1));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(1337, 0 + 2), 1, 0);
        concurrentLinkedQueue.add(new StreamRecord("1337", 0 + 2));
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
        Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
    }

    @Test
    public void testWatermarkAndStreamStatusForwarding() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = twoInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new CoStreamMap(new IdentityMap()));
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processElement(new Watermark(0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0L), 0, 1);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0L), 1, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new Watermark(0L), 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello", 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(42, 0L), 1, 1);
        concurrentLinkedQueue.add(new StreamRecord("Hello", 0L));
        concurrentLinkedQueue.add(new StreamRecord("42", 0L));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 3), 0, 1);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 3), 1, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 2), 1, 1);
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 3));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 0, 1);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 1, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 4));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(StreamStatus.IDLE, 0, 1);
        twoInputStreamTaskTestHarness.processElement(StreamStatus.IDLE, 1, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 6), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 5), 1, 1);
        twoInputStreamTaskTestHarness.processElement(StreamStatus.IDLE, 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 5));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(StreamStatus.IDLE, 0, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(StreamStatus.IDLE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(StreamStatus.ACTIVE, 1, 0);
        twoInputStreamTaskTestHarness.processElement(StreamStatus.ACTIVE, 0, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(StreamStatus.ACTIVE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
        Assert.assertEquals(2L, TestHarnessUtil.getRawElementsFromOutput(twoInputStreamTaskTestHarness.getOutput()).size());
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = twoInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new CoStreamMap(new IdentityMap()));
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-0-0", 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Ciao-0-0", 0L), 0, 1);
        concurrentLinkedQueue.add(new StreamRecord("Ciao-0-0", 0L));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(11, 0L), 1, 1);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(111, 0L), 1, 1);
        concurrentLinkedQueue.add(new StreamRecord("11", 0L));
        concurrentLinkedQueue.add(new StreamRecord("111", 0L));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        for (int i = 0; i < 20 && twoInputStreamTaskTestHarness.getOutput().size() < concurrentLinkedQueue.size(); i++) {
            Thread.sleep(100L);
        }
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
        concurrentLinkedQueue.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        concurrentLinkedQueue.add(new StreamRecord("Hello-0-0", 0L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        Assert.assertEquals(4L, TestHarnessUtil.getRawElementsFromOutput(twoInputStreamTaskTestHarness.getOutput()).size());
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = twoInputStreamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new CoStreamMap(new IdentityMap()));
        streamConfig.setOperatorID(new OperatorID());
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-0-0", 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Ciao-0-0", 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(42, 0L), 1, 1);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(1337, 0L), 1, 1);
        concurrentLinkedQueue.add(new StreamRecord("42", 0L));
        concurrentLinkedQueue.add(new StreamRecord("1337", 0L));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        concurrentLinkedQueue.add(new CancelCheckpointMarker(0L));
        concurrentLinkedQueue.add(new StreamRecord("Hello-0-0", 0L));
        concurrentLinkedQueue.add(new StreamRecord("Ciao-0-0", 0L));
        concurrentLinkedQueue.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
    }

    @Test
    public void testOperatorMetricReuse() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOperatorChain(new OperatorID(), new DuplicatingOperator()).chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OperatorID(), new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        final UnregisteredMetricGroups.UnregisteredTaskMetricGroup unregisteredTaskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() { // from class: org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.1
            public OperatorMetricGroup addOperator(OperatorID operatorID, String str) {
                return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, str);
            }
        };
        StreamMockEnvironment streamMockEnvironment = new StreamMockEnvironment(twoInputStreamTaskTestHarness.jobConfig, twoInputStreamTaskTestHarness.taskConfig, twoInputStreamTaskTestHarness.memorySize, new MockInputSplitProvider(), twoInputStreamTaskTestHarness.bufferSize, new TestTaskStateManager()) { // from class: org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.2
            @Override // org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment
            public TaskMetricGroup getMetricGroup() {
                return unregisteredTaskMetricGroup;
            }
        };
        Counter numRecordsInCounter = unregisteredTaskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
        Counter numRecordsOutCounter = unregisteredTaskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
        twoInputStreamTaskTestHarness.invoke(streamMockEnvironment);
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        for (int i = 0; i < 5; i++) {
            twoInputStreamTaskTestHarness.processElement(new StreamRecord("hello"), 0, 0);
        }
        for (int i2 = 0; i2 < 3; i2++) {
            twoInputStreamTaskTestHarness.processElement(new StreamRecord("hello"), 1, 0);
        }
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        Assert.assertEquals(8L, numRecordsInCounter.getCount());
        Assert.assertEquals(64L, numRecordsOutCounter.getCount());
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        CoStreamMap coStreamMap = new CoStreamMap(new IdentityMap());
        final OperatorID operatorID = new OperatorID();
        OneInputStreamTaskTest.WatermarkMetricOperator watermarkMetricOperator = new OneInputStreamTaskTest.WatermarkMetricOperator();
        final OperatorID operatorID2 = new OperatorID();
        twoInputStreamTaskTestHarness.setupOperatorChain(operatorID, (TwoInputStreamOperator<?, ?, ?>) coStreamMap).chain(operatorID2, watermarkMetricOperator, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish();
        final InterceptingOperatorMetricGroup interceptingOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup interceptingOperatorMetricGroup2 = new InterceptingOperatorMetricGroup();
        final InterceptingTaskMetricGroup interceptingTaskMetricGroup = new InterceptingTaskMetricGroup() { // from class: org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.3
            public OperatorMetricGroup addOperator(OperatorID operatorID3, String str) {
                return operatorID3.equals(operatorID) ? interceptingOperatorMetricGroup : operatorID3.equals(operatorID2) ? interceptingOperatorMetricGroup2 : super.addOperator(operatorID3, str);
            }
        };
        twoInputStreamTaskTestHarness.invoke(new StreamMockEnvironment(twoInputStreamTaskTestHarness.jobConfig, twoInputStreamTaskTestHarness.taskConfig, twoInputStreamTaskTestHarness.memorySize, new MockInputSplitProvider(), twoInputStreamTaskTestHarness.bufferSize, new TestTaskStateManager()) { // from class: org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTest.4
            @Override // org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment
            public TaskMetricGroup getMetricGroup() {
                return interceptingTaskMetricGroup;
            }
        });
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        Gauge gauge = interceptingTaskMetricGroup.get("currentInputWatermark");
        Gauge gauge2 = interceptingOperatorMetricGroup.get("currentInput1Watermark");
        Gauge gauge3 = interceptingOperatorMetricGroup.get("currentInput2Watermark");
        Gauge gauge4 = interceptingOperatorMetricGroup.get("currentInputWatermark");
        Gauge gauge5 = interceptingOperatorMetricGroup.get("currentOutputWatermark");
        Gauge gauge6 = interceptingOperatorMetricGroup2.get("currentInputWatermark");
        Gauge gauge7 = interceptingOperatorMetricGroup2.get("currentOutputWatermark");
        Assert.assertEquals("A metric was registered multiple times.", 7L, new HashSet(Arrays.asList(gauge, gauge2, gauge3, gauge4, gauge5, gauge6, gauge7)).size());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge2.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge3.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge6.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge7.getValue()).longValue());
        twoInputStreamTaskTestHarness.processElement(new Watermark(1L), 0, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge4.getValue()).longValue());
        Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge3.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge5.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge6.getValue()).longValue());
        Assert.assertEquals(Long.MIN_VALUE, ((Long) gauge7.getValue()).longValue());
        twoInputStreamTaskTestHarness.processElement(new Watermark(2L), 1, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        Assert.assertEquals(1L, ((Long) gauge.getValue()).longValue());
        Assert.assertEquals(1L, ((Long) gauge4.getValue()).longValue());
        Assert.assertEquals(1L, ((Long) gauge2.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge3.getValue()).longValue());
        Assert.assertEquals(1L, ((Long) gauge5.getValue()).longValue());
        Assert.assertEquals(1L, ((Long) gauge6.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge7.getValue()).longValue());
        twoInputStreamTaskTestHarness.processElement(new Watermark(3L), 0, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        Assert.assertEquals(2L, ((Long) gauge.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge4.getValue()).longValue());
        Assert.assertEquals(3L, ((Long) gauge2.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge3.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge5.getValue()).longValue());
        Assert.assertEquals(2L, ((Long) gauge6.getValue()).longValue());
        Assert.assertEquals(4L, ((Long) gauge7.getValue()).longValue());
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
    }
}
