package org.apache.flink.streaming.api.operators.co;

import java.io.Serializable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoStreamMapTest.class */
public class CoStreamMapTest implements Serializable {
    private static final long serialVersionUID = 1;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoStreamMapTest$MyCoMap.class */
    private static final class MyCoMap implements CoMapFunction<Double, Integer, String> {
        private static final long serialVersionUID = 1;

        private MyCoMap() {
        }

        public String map1(Double d) {
            return d.toString();
        }

        public String map2(Integer num) {
            return num.toString();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/co/CoStreamMapTest$TestOpenCloseCoMapFunction.class */
    private static class TestOpenCloseCoMapFunction extends RichCoMapFunction<Double, Integer, String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseCoMapFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public String map1(Double d) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return d.toString();
        }

        public String map2(Integer num) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            return num.toString();
        }
    }

    @Test
    public void testCoMap() throws Exception {
        TwoInputStreamOperatorTestHarness twoInputStreamOperatorTestHarness = new TwoInputStreamOperatorTestHarness(new CoStreamMap(new MyCoMap()));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        twoInputStreamOperatorTestHarness.open();
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(Double.valueOf(1.1d), 0 + 1));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(Double.valueOf(1.2d), 0 + 2));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(Double.valueOf(1.3d), 0 + 3));
        twoInputStreamOperatorTestHarness.processWatermark1(new Watermark(0 + 3));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(Double.valueOf(1.4d), 0 + 4));
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(Double.valueOf(1.5d), 0 + 5));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(1, 0 + 1));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(2, 0 + 2));
        twoInputStreamOperatorTestHarness.processWatermark2(new Watermark(0 + 2));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(3, 0 + 3));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(4, 0 + 4));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(5, 0 + 5));
        concurrentLinkedQueue.add(new StreamRecord("1.1", 0 + 1));
        concurrentLinkedQueue.add(new StreamRecord("1.2", 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("1.3", 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord("1.4", 0 + 4));
        concurrentLinkedQueue.add(new StreamRecord("1.5", 0 + 5));
        concurrentLinkedQueue.add(new StreamRecord("1", 0 + 1));
        concurrentLinkedQueue.add(new StreamRecord("2", 0 + 2));
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        concurrentLinkedQueue.add(new StreamRecord("3", 0 + 3));
        concurrentLinkedQueue.add(new StreamRecord("4", 0 + 4));
        concurrentLinkedQueue.add(new StreamRecord("5", 0 + 5));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, twoInputStreamOperatorTestHarness.getOutput());
    }

    @Test
    public void testOpenClose() throws Exception {
        TwoInputStreamOperatorTestHarness twoInputStreamOperatorTestHarness = new TwoInputStreamOperatorTestHarness(new CoStreamMap(new TestOpenCloseCoMapFunction()));
        twoInputStreamOperatorTestHarness.open();
        twoInputStreamOperatorTestHarness.processElement1(new StreamRecord(Double.valueOf(74.0d), 0L));
        twoInputStreamOperatorTestHarness.processElement2(new StreamRecord(42, 0L));
        twoInputStreamOperatorTestHarness.close();
        Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoMapFunction.closeCalled);
        Assert.assertTrue("Output contains no elements.", twoInputStreamOperatorTestHarness.getOutput().size() > 0);
    }
}
