package org.apache.flink.test.manual;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/test/manual/StreamingScalabilityAndLatency.class */
public class StreamingScalabilityAndLatency {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/manual/StreamingScalabilityAndLatency$IdMapper.class */
    public static class IdMapper<T> implements MapFunction<T, T> {
        private static final long serialVersionUID = -6543809409233225099L;

        private IdMapper() {
        }

        public T map(T t) {
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/manual/StreamingScalabilityAndLatency$TimeStampingSource.class */
    public static class TimeStampingSource implements ParallelSourceFunction<Tuple2<Long, Long>> {
        private static final long serialVersionUID = -151782334777482511L;
        private volatile boolean running;

        private TimeStampingSource() {
            this.running = true;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> sourceContext) throws Exception {
            long j = 100;
            long random = (long) (Math.random() * 4096.0d);
            while (this.running) {
                if (j < 100) {
                    j++;
                    long j2 = random;
                    random = j2 + 1;
                    new Tuple2(Long.valueOf(j2), 0L);
                    sourceContext.collect(sourceContext);
                } else {
                    j = 0;
                    long j3 = random;
                    random = sourceContext + 1;
                    sourceContext.collect(new Tuple2(Long.valueOf(j3), Long.valueOf(System.currentTimeMillis())));
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/manual/StreamingScalabilityAndLatency$TimestampingSink.class */
    public static class TimestampingSink implements SinkFunction<Tuple2<Long, Long>> {
        private static final long serialVersionUID = 1876986644706201196L;
        private long maxLatency;
        private long count;

        private TimestampingSink() {
        }

        public void invoke(Tuple2<Long, Long> tuple2) {
            long longValue = ((Long) tuple2.f1).longValue();
            if (longValue != 0) {
                this.maxLatency = Math.max(System.currentTimeMillis() - longValue, this.maxLatency);
            }
            this.count++;
            if (this.count == 5000) {
                System.out.println("Max latency: " + this.maxLatency);
                this.count = 0L;
                this.maxLatency = 0L;
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        if ((Runtime.getRuntime().maxMemory() >>> 20) < 5000) {
            throw new RuntimeException("This test program needs to run with at least 5GB of heap space.");
        }
        MiniClusterWithClientResource miniClusterWithClientResource = null;
        try {
            try {
                Configuration configuration = new Configuration();
                configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
                configuration.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
                configuration.setInteger("taskmanager.net.server.numThreads", 1);
                configuration.setInteger("taskmanager.net.client.numThreads", 1);
                miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(80).build());
                miniClusterWithClientResource.before();
                runPartitioningProgram(80);
                if (miniClusterWithClientResource != null) {
                    miniClusterWithClientResource.after();
                }
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                if (miniClusterWithClientResource != null) {
                    miniClusterWithClientResource.after();
                }
            }
        } catch (Throwable th) {
            if (miniClusterWithClientResource != null) {
                miniClusterWithClientResource.after();
            }
            throw th;
        }
    }

    private static void runPartitioningProgram(int i) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(i);
        executionEnvironment.getConfig().enableObjectReuse();
        executionEnvironment.setBufferTimeout(5L);
        executionEnvironment.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
        executionEnvironment.addSource(new TimeStampingSource()).map(new IdMapper()).keyBy(new int[]{0}).addSink(new TimestampingSink());
        executionEnvironment.execute("Partitioning Program");
    }
}
