/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={ResultPartitionWriter.class})
@PowerMockIgnore(value={"javax.management.*", "com.sun.jndi.*"})
public class TwoInputStreamTaskTest {
    @Test
    public void testOpenCloseAndTimestamps() throws Exception {
        TwoInputStreamTask coMapTask = new TwoInputStreamTask();
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new TestOpenCloseMapFunction());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        long initialTime = 0L;
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L), 0, 0);
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
        testHarness.waitForInputProcessing();
        testHarness.processElement(new StreamRecord((Object)1337, initialTime + 2L), 1, 0);
        expectedOutput.add(new StreamRecord((Object)"1337", initialTime + 2L));
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        Assert.assertTrue((String)"RichFunction methods where not called.", (boolean)TestOpenCloseMapFunction.closeCalled);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testWatermarkForwarding() throws Exception {
        TwoInputStreamTask coMapTask = new TwoInputStreamTask();
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(coMapTask, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processElement(new Watermark(initialTime), 0, 0);
        testHarness.processElement(new Watermark(initialTime), 0, 1);
        testHarness.processElement(new Watermark(initialTime), 1, 0);
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new StreamRecord((Object)"Hello", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello", initialTime));
        expectedOutput.add(new StreamRecord((Object)"42", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 0);
        testHarness.processElement(new Watermark(initialTime + 3L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 3L), 1, 0);
        testHarness.processElement(new Watermark(initialTime + 2L), 1, 1);
        expectedOutput.add(new Watermark(initialTime + 2L));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 3L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(new Watermark(initialTime + 4L), 0, 1);
        testHarness.processElement(new Watermark(initialTime + 4L), 1, 0);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new Watermark(initialTime + 4L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)2L, (long)resultElements.size());
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        TwoInputStreamTask coMapTask = new TwoInputStreamTask();
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(coMapTask, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 1);
        expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
        testHarness.processElement(new StreamRecord((Object)11, initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)111, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"11", initialTime));
        expectedOutput.add(new StreamRecord((Object)"111", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", testHarness.getOutput(), expectedOutput);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L), 1, 1);
        testHarness.waitForInputProcessing();
        expectedOutput.add(new CheckpointBarrier(0L, 0L));
        expectedOutput.add(new StreamRecord((Object)"Hello-0-0", initialTime));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", testHarness.getOutput(), expectedOutput);
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
        Assert.assertEquals((long)4L, (long)resultElements.size());
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        TwoInputStreamTask coMapTask = new TwoInputStreamTask();
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(coMapTask, 2, 2, new int[]{1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        CoStreamMap coMapOperator = new CoStreamMap((CoMapFunction)new IdentityMap());
        streamConfig.setStreamOperator((StreamOperator)coMapOperator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Hello-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 0);
        testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
        testHarness.processElement(new StreamRecord((Object)1337, initialTime), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"42", initialTime));
        expectedOutput.add(new StreamRecord((Object)"1337", initialTime));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L), 0, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L), 1, 1);
        expectedOutput.add(new StreamRecord((Object)"Hello-0-0", initialTime));
        expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
        expectedOutput.add(new CheckpointBarrier(1L, 1L));
        testHarness.waitForInputProcessing();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L), 0, 1);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L), 1, 0);
        testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L), 1, 1);
        testHarness.waitForInputProcessing();
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    private static class IdentityMap
    implements CoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;

        private IdentityMap() {
        }

        public String map1(String value) throws Exception {
            return value;
        }

        public String map2(Integer value) throws Exception {
            return value.toString();
        }
    }

    private static class TestOpenCloseMapFunction
    extends RichCoMapFunction<String, Integer, String> {
        private static final long serialVersionUID = 1L;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseMapFunction() {
        }

        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            if (closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map1(String value) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value;
        }

        public String map2(Integer value) throws Exception {
            if (!openCalled) {
                Assert.fail((String)"Open was not called before run.");
            }
            return value.toString();
        }
    }
}

