/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.manual;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.junit.Assert;

public class StreamingScalabilityAndLatency {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        if (Runtime.getRuntime().maxMemory() >>> 20 < 5000L) {
            throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
        }
        boolean TASK_MANAGERS = true;
        int SLOTS_PER_TASK_MANAGER = 80;
        int PARALLELISM = 80;
        LocalFlinkMiniCluster cluster = null;
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 1);
            config.setInteger("taskmanager.memory.size", 80);
            config.setInteger("taskmanager.numberOfTaskSlots", 80);
            config.setInteger("taskmanager.network.numberOfBuffers", 20000);
            config.setInteger("taskmanager.net.server.numThreads", 1);
            config.setInteger("taskmanager.net.client.numThreads", 1);
            cluster = new LocalFlinkMiniCluster(config, false, StreamingMode.STREAMING);
            cluster.start();
            StreamingScalabilityAndLatency.runPartitioningProgram(cluster.getLeaderRPCPort(), 80);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    private static void runPartitioningProgram(int jobManagerPort, int parallelism) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)jobManagerPort, (String[])new String[0]);
        env.setParallelism(parallelism);
        env.getConfig().enableObjectReuse();
        env.setBufferTimeout(5L);
        env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
        env.addSource((SourceFunction)new TimeStampingSource()).map(new IdMapper()).partitionByHash(new int[]{0}).addSink((SinkFunction)new TimestampingSink());
        env.execute("Partitioning Program");
    }

    public static class IdMapper<T>
    implements MapFunction<T, T> {
        private static final long serialVersionUID = -6543809409233225099L;

        public T map(T value) {
            return value;
        }
    }

    public static class TimestampingSink
    implements SinkFunction<Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1876986644706201196L;
        private long maxLatency;
        private long count;

        public void invoke(Tuple2<Long, Long> value) {
            long ts = (Long)value.f1;
            if (ts != 0L) {
                long diff = System.currentTimeMillis() - ts;
                this.maxLatency = Math.max(diff, this.maxLatency);
            }
            ++this.count;
            if (this.count == 5000L) {
                System.out.println("Max latency: " + this.maxLatency);
                this.count = 0L;
                this.maxLatency = 0L;
            }
        }
    }

    public static class TimeStampingSource
    implements ParallelSourceFunction<Tuple2<Long, Long>> {
        private static final long serialVersionUID = -151782334777482511L;
        private volatile boolean running = true;

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            long num = 100L;
            long counter = (long)(Math.random() * 4096.0);
            while (this.running) {
                if (num < 100L) {
                    ++num;
                    ctx.collect((Object)new Tuple2((Object)counter++, (Object)0L));
                } else {
                    num = 0L;
                    ctx.collect((Object)new Tuple2((Object)counter++, (Object)System.currentTimeMillis()));
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

