package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.class */
public class TwoInputStreamTaskTest {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest$IdentityMap.class */
    private static class IdentityMap implements CoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1;

        private IdentityMap() {
        }

        public String map1(String str) throws Exception {
            return str;
        }

        public String map2(Integer num) throws Exception {
            return num.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest$TestOpenCloseMapFunction.class */
    private static class TestOpenCloseMapFunction extends RichCoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseMapFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map1(String str) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return str;
        }

        public String map2(Integer num) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return num.toString();
        }
    }

    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(new TwoInputStreamTask(), BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        twoInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new CoStreamMap(new TestOpenCloseMapFunction()));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello", 0 + 1), 0, 0);
        concurrentLinkedQueue.add(new StreamRecord("Hello", 0 + 1));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(1337, 0 + 2), 1, 0);
        concurrentLinkedQueue.add(new StreamRecord("1337", 0 + 2));
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
        Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
    }

    @Test
    public void testWatermarkAndStreamStatusForwarding() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(new TwoInputStreamTask(), 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        twoInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new CoStreamMap(new IdentityMap()));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processElement(new Watermark(0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0L), 0, 1);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0L), 1, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new Watermark(0L), 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello", 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(42, 0L), 1, 1);
        concurrentLinkedQueue.add(new StreamRecord("Hello", 0L));
        concurrentLinkedQueue.add(new StreamRecord("42", 0L));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 3), 0, 1);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 3), 1, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 2), 1, 1);
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 3));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 0, 1);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 4), 1, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 4));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(StreamStatus.IDLE, 0, 1);
        twoInputStreamTaskTestHarness.processElement(StreamStatus.IDLE, 1, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 6), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new Watermark(0 + 5), 1, 1);
        twoInputStreamTaskTestHarness.processElement(StreamStatus.IDLE, 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(new Watermark(0 + 5));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(StreamStatus.IDLE, 0, 0);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(StreamStatus.IDLE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processElement(StreamStatus.ACTIVE, 1, 0);
        twoInputStreamTaskTestHarness.processElement(StreamStatus.ACTIVE, 0, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        concurrentLinkedQueue.add(StreamStatus.ACTIVE);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
        Assert.assertEquals(2L, TestHarnessUtil.getRawElementsFromOutput(twoInputStreamTaskTestHarness.getOutput()).size());
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(new TwoInputStreamTask(), 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        twoInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new CoStreamMap(new IdentityMap()));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-0-0", 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Ciao-0-0", 0L), 0, 1);
        concurrentLinkedQueue.add(new StreamRecord("Ciao-0-0", 0L));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(11, 0L), 1, 1);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(111, 0L), 1, 1);
        concurrentLinkedQueue.add(new StreamRecord("11", 0L));
        concurrentLinkedQueue.add(new StreamRecord("111", 0L));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        for (int i = 0; i < 20 && twoInputStreamTaskTestHarness.getOutput().size() < concurrentLinkedQueue.size(); i++) {
            Thread.sleep(100L);
        }
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()), 0, 1);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()), 1, 0);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()), 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
        concurrentLinkedQueue.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()));
        concurrentLinkedQueue.add(new StreamRecord("Hello-0-0", 0L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        Assert.assertEquals(4L, TestHarnessUtil.getRawElementsFromOutput(twoInputStreamTaskTestHarness.getOutput()).size());
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        TwoInputStreamTaskTestHarness twoInputStreamTaskTestHarness = new TwoInputStreamTaskTestHarness(new TwoInputStreamTask(), 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        twoInputStreamTaskTestHarness.setupOutputForSingletonOperatorChain();
        twoInputStreamTaskTestHarness.getStreamConfig().setStreamOperator(new CoStreamMap(new IdentityMap()));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamTaskTestHarness.invoke();
        twoInputStreamTaskTestHarness.waitForTaskRunning();
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Hello-0-0", 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord("Ciao-0-0", 0L), 0, 0);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(42, 0L), 1, 1);
        twoInputStreamTaskTestHarness.processElement(new StreamRecord(1337, 0L), 1, 1);
        concurrentLinkedQueue.add(new StreamRecord("42", 0L));
        concurrentLinkedQueue.add(new StreamRecord("1337", 0L));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forFullCheckpoint()), 0, 0);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forFullCheckpoint()), 0, 1);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forFullCheckpoint()), 1, 0);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(1L, 1L, CheckpointOptions.forFullCheckpoint()), 1, 1);
        concurrentLinkedQueue.add(new CancelCheckpointMarker(0L));
        concurrentLinkedQueue.add(new StreamRecord("Hello-0-0", 0L));
        concurrentLinkedQueue.add(new StreamRecord("Ciao-0-0", 0L));
        concurrentLinkedQueue.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forFullCheckpoint()));
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()), 0, 1);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()), 1, 0);
        twoInputStreamTaskTestHarness.processEvent(new CheckpointBarrier(0L, 0L, CheckpointOptions.forFullCheckpoint()), 1, 1);
        twoInputStreamTaskTestHarness.waitForInputProcessing();
        twoInputStreamTaskTestHarness.endInput();
        twoInputStreamTaskTestHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamTaskTestHarness.getOutput());
    }
}
