package org.apache.flink.test.streaming.api;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase.class */
public class StreamingOperatorsITCase extends AbstractTestBase {

    /* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase$MemorySinkFunction.class */
    private static class MemorySinkFunction implements SinkFunction<Integer> {
        private static Map<Integer, Collection<Integer>> collections = new ConcurrentHashMap();
        private static final long serialVersionUID = -8815570195074103860L;
        private final int key;

        public MemorySinkFunction(int i) {
            this.key = i;
        }

        public void invoke(Integer num) throws Exception {
            Collection<Integer> collection = collections.get(Integer.valueOf(this.key));
            synchronized (collection) {
                collection.add(num);
            }
        }

        public static void registerCollection(int i, Collection<Integer> collection) {
            collections.put(Integer.valueOf(i), collection);
        }

        public static void clear() {
            collections.clear();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase$NonSerializable.class */
    private static class NonSerializable {
        private final Object obj = new Object();
        private final int value;

        public NonSerializable(int i) {
            this.value = i;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase$NonSerializableTupleSource.class */
    private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer, NonSerializable>> {
        private static final long serialVersionUID = 3949171986015451520L;
        private final int numElements;

        public NonSerializableTupleSource(int i) {
            this.numElements = i;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, NonSerializable>> sourceContext) throws Exception {
            for (int i = 0; i < this.numElements; i++) {
                sourceContext.collect(new Tuple2(Integer.valueOf(i), new NonSerializable(i)));
            }
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/api/StreamingOperatorsITCase$TupleSource.class */
    private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = -8110466235852024821L;
        private final int numElements;
        private final int numKeys;

        public TupleSource(int i, int i2) {
            this.numElements = i;
            this.numKeys = i2;
        }

        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
            for (int i = 0; i < this.numElements; i++) {
                sourceContext.collect(new Tuple2(Integer.valueOf(1 + (MathUtils.murmurHash(i) % this.numKeys)), Integer.valueOf(i)));
            }
        }

        public void cancel() {
        }
    }

    @Test
    public void testAsyncWaitOperator() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource addSource = executionEnvironment.addSource(new NonSerializableTupleSource(5));
        RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer> richAsyncFunction = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.1
            private static final long serialVersionUID = 7000343199829487985L;
            transient ExecutorService executorService;

            public void open(OpenContext openContext) throws Exception {
                super.open(openContext);
                this.executorService = Executors.newFixedThreadPool(5);
            }

            public void close() throws Exception {
                super.close();
                this.executorService.shutdownNow();
            }

            public void asyncInvoke(final Tuple2<Integer, NonSerializable> tuple2, final ResultFuture<Integer> resultFuture) throws Exception {
                this.executorService.submit(new Runnable() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        resultFuture.complete(Collections.singletonList(Integer.valueOf(((Integer) tuple2.f0).intValue() + ((Integer) tuple2.f0).intValue())));
                    }
                });
            }

            public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
                asyncInvoke((Tuple2<Integer, NonSerializable>) obj, (ResultFuture<Integer>) resultFuture);
            }
        };
        SingleOutputStreamOperator parallelism = AsyncDataStream.orderedWait(addSource, richAsyncFunction, 1000L, TimeUnit.MILLISECONDS, 2).setParallelism(1);
        MemorySinkFunction memorySinkFunction = new MemorySinkFunction(0);
        ArrayList arrayList = new ArrayList(5);
        MemorySinkFunction.registerCollection(0, arrayList);
        parallelism.addSink(memorySinkFunction).setParallelism(1);
        SingleOutputStreamOperator unorderedWait = AsyncDataStream.unorderedWait(addSource, richAsyncFunction, 1000L, TimeUnit.MILLISECONDS, 2);
        MemorySinkFunction memorySinkFunction2 = new MemorySinkFunction(1);
        ArrayList arrayList2 = new ArrayList(5);
        MemorySinkFunction.registerCollection(1, arrayList2);
        unorderedWait.addSink(memorySinkFunction2);
        ArrayList arrayList3 = new ArrayList(10);
        for (int i = 0; i < 5; i++) {
            arrayList3.add(Integer.valueOf(i + i));
        }
        executionEnvironment.execute();
        Assert.assertEquals(arrayList3, arrayList);
        Collections.sort(arrayList2);
        Assert.assertEquals(arrayList3, arrayList2);
        MemorySinkFunction.clear();
    }

    @Test
    public void testOperatorChainWithObjectReuseAndNoOutputOperators() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().enableObjectReuse();
        executionEnvironment.fromData(new Integer[]{1, 2, 3}).flatMap(new FlatMapFunction<Integer, Integer>() { // from class: org.apache.flink.test.streaming.api.StreamingOperatorsITCase.2
            public void flatMap(Integer num, Collector<Integer> collector) throws Exception {
                collector.collect(Integer.valueOf(num.intValue() << 1));
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Integer) obj, (Collector<Integer>) collector);
            }
        });
        executionEnvironment.execute();
    }
}
