/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.manual;

import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.Assert;

public class StreamingScalabilityAndLatency {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        if (Runtime.getRuntime().maxMemory() >>> 20 < 5000L) {
            throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
        }
        boolean taskManagers = true;
        int slotsPerTaskManager = 80;
        int parallelism = 80;
        MiniClusterWithClientResource cluster = null;
        try {
            Configuration config = new Configuration();
            config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"80m"));
            config.set(ConfigurationUtils.getIntConfigOption((String)"taskmanager.net.server.numThreads"), (Object)1);
            config.set(ConfigurationUtils.getIntConfigOption((String)"taskmanager.net.client.numThreads"), (Object)1);
            cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(80).build());
            cluster.before();
            StreamingScalabilityAndLatency.runPartitioningProgram(80);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (cluster != null) {
                cluster.after();
            }
        }
    }

    private static void runPartitioningProgram(int parallelism) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.getConfig().enableObjectReuse();
        env.setBufferTimeout(5L);
        env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
        env.addSource((SourceFunction)new TimeStampingSource()).map(new IdMapper()).keyBy((KeySelector & Serializable)x -> (Long)x.f0).addSink((SinkFunction)new TimestampingSink());
        env.execute("Partitioning Program");
    }

    private static class IdMapper<T>
    implements MapFunction<T, T> {
        private static final long serialVersionUID = -6543809409233225099L;

        private IdMapper() {
        }

        public T map(T value) {
            return value;
        }
    }

    private static class TimestampingSink
    implements SinkFunction<Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1876986644706201196L;
        private long maxLatency;
        private long count;

        private TimestampingSink() {
        }

        public void invoke(Tuple2<Long, Long> value) {
            long ts = (Long)value.f1;
            if (ts != 0L) {
                long diff = System.currentTimeMillis() - ts;
                this.maxLatency = Math.max(diff, this.maxLatency);
            }
            ++this.count;
            if (this.count == 5000L) {
                System.out.println("Max latency: " + this.maxLatency);
                this.count = 0L;
                this.maxLatency = 0L;
            }
        }
    }

    private static class TimeStampingSource
    implements ParallelSourceFunction<Tuple2<Long, Long>> {
        private static final long serialVersionUID = -151782334777482511L;
        private volatile boolean running = true;

        private TimeStampingSource() {
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            long num = 100L;
            long counter = (long)(Math.random() * 4096.0);
            while (this.running) {
                if (num < 100L) {
                    ++num;
                    ctx.collect((Object)new Tuple2((Object)counter++, (Object)0L));
                } else {
                    num = 0L;
                    ctx.collect((Object)new Tuple2((Object)counter++, (Object)System.currentTimeMillis()));
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

