package org.apache.flink.streaming.api.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamFlatMapTest.class */
public class StreamFlatMapTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamFlatMapTest$MyFlatMap.class */
    private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
        private static final long serialVersionUID = 1;

        private MyFlatMap() {
        }

        public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
            if (num.intValue() % 2 == 0) {
                collector.collect(num);
                collector.collect(Integer.valueOf(num.intValue() * num.intValue()));
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Integer) obj, (Collector<Integer>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamFlatMapTest$TestOpenCloseFlatMapFunction.class */
    private static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {
        private static final long serialVersionUID = 1;
        public static boolean openCalled = false;
        public static boolean closeCalled = false;

        private TestOpenCloseFlatMapFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            if (closeCalled) {
                Assert.fail("Close called before open.");
            }
            openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!openCalled) {
                Assert.fail("Open was not called before close.");
            }
            closeCalled = true;
        }

        public void flatMap(String str, Collector<String> collector) throws Exception {
            if (!openCalled) {
                Assert.fail("Open was not called before run.");
            }
            collector.collect(str);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((String) obj, (Collector<String>) collector);
        }
    }

    @Test
    public void testFlatMap() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamFlatMap(new MyFlatMap()));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(1, 0 + 1));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(2, 0 + 2));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(0 + 2));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(3, 0 + 3));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(4, 0 + 4));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(5, 0 + 5));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(6, 0 + 6));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(7, 0 + 7));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(8, 0 + 8));
        concurrentLinkedQueue.add(new StreamRecord(2, 0 + 2));
        concurrentLinkedQueue.add(new StreamRecord(4, 0 + 2));
        concurrentLinkedQueue.add(new Watermark(0 + 2));
        concurrentLinkedQueue.add(new StreamRecord(4, 0 + 4));
        concurrentLinkedQueue.add(new StreamRecord(16, 0 + 4));
        concurrentLinkedQueue.add(new StreamRecord(6, 0 + 6));
        concurrentLinkedQueue.add(new StreamRecord(36, 0 + 6));
        concurrentLinkedQueue.add(new StreamRecord(8, 0 + 8));
        concurrentLinkedQueue.add(new StreamRecord(64, 0 + 8));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", concurrentLinkedQueue, oneInputStreamOperatorTestHarness.getOutput());
    }

    @Test
    public void testOpenClose() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamFlatMap(new TestOpenCloseFlatMapFunction()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("Hello", 0L));
        oneInputStreamOperatorTestHarness.close();
        Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);
        Assert.assertTrue("Output contains no elements.", oneInputStreamOperatorTestHarness.getOutput().size() > 0);
    }
}
