/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetworkStackThroughputITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkStackThroughputITCase.class);
    private static final String DATA_VOLUME_GB_CONFIG_KEY = "data.volume.gb";
    private static final String IS_SLOW_SENDER_CONFIG_KEY = "is.slow.sender";
    private static final String IS_SLOW_RECEIVER_CONFIG_KEY = "is.slow.receiver";
    private static final int IS_SLOW_SLEEP_MS = 10;
    private static final int IS_SLOW_EVERY_NUM_RECORDS = 512;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testThroughput() throws Exception {
        Object[][] configParams;
        for (Object[] p : configParams = new Object[][]{{1, false, false, false, 4, 2}, {1, true, false, false, 4, 2}, {1, true, true, false, 4, 2}, {1, true, false, true, 4, 2}, {2, true, false, false, 4, 2}, {4, true, false, false, 4, 2}, {4, true, false, false, 8, 4}}) {
            int numSlotsPerTaskManager;
            int dataVolumeGb = (Integer)p[0];
            boolean useForwarder = (Boolean)p[1];
            boolean isSlowSender = (Boolean)p[2];
            boolean isSlowReceiver = (Boolean)p[3];
            int parallelism = (Integer)p[4];
            if (parallelism % (numSlotsPerTaskManager = ((Integer)p[5]).intValue()) != 0) {
                throw new RuntimeException("The test case defines a parallelism that is not a multiple of the slots per task manager.");
            }
            int numTaskManagers = parallelism / numSlotsPerTaskManager;
            MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
            cluster.before();
            try {
                System.out.println(String.format("Running test with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s", dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism, numSlotsPerTaskManager));
                this.testProgram(cluster, dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
            }
            finally {
                cluster.after();
            }
        }
    }

    private void testProgram(MiniClusterWithClientResource cluster, int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver, int parallelism) throws Exception {
        ClusterClient client = cluster.getClusterClient();
        JobGraph jobGraph = this.createJobGraph(dataVolumeGb, useForwarder, isSlowSender, isSlowReceiver, parallelism);
        JobResult jobResult = (JobResult)((CompletableFuture)client.submitJob(jobGraph).thenCompose(arg_0 -> ((ClusterClient)client).requestJobResult(arg_0))).get();
        Assert.assertFalse((boolean)jobResult.getSerializedThrowable().isPresent());
        long dataVolumeMbit = dataVolumeGb * 8192;
        long runtimeSecs = TimeUnit.SECONDS.convert(jobResult.getNetRuntime(), TimeUnit.MILLISECONDS);
        int mbitPerSecond = (int)((double)dataVolumeMbit / (double)runtimeSecs);
        LOG.info(String.format("Test finished with throughput of %d MBit/s (runtime [secs]: %d, data volume [gb/mbits]: %d/%d)", mbitPerSecond, runtimeSecs, dataVolumeGb, dataVolumeMbit));
    }

    private JobGraph createJobGraph(int dataVolumeGb, boolean useForwarder, boolean isSlowSender, boolean isSlowReceiver, int numSubtasks) {
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        ArrayList<JobVertex> jobVertices = new ArrayList<JobVertex>();
        JobVertex producer = new JobVertex("Speed Test Producer");
        producer.setSlotSharingGroup(sharingGroup);
        producer.setInvokableClass(SpeedTestProducer.class);
        producer.setParallelism(numSubtasks);
        producer.getConfiguration().set(ConfigurationUtils.getIntConfigOption((String)DATA_VOLUME_GB_CONFIG_KEY), (Object)dataVolumeGb);
        producer.getConfiguration().set(ConfigurationUtils.getBooleanConfigOption((String)IS_SLOW_SENDER_CONFIG_KEY), (Object)isSlowSender);
        jobVertices.add(producer);
        JobVertex forwarder = null;
        if (useForwarder) {
            forwarder = new JobVertex("Speed Test Forwarder");
            forwarder.setSlotSharingGroup(sharingGroup);
            forwarder.setInvokableClass(SpeedTestForwarder.class);
            forwarder.setParallelism(numSubtasks);
            jobVertices.add(forwarder);
        }
        JobVertex consumer = new JobVertex("Speed Test Consumer");
        consumer.setSlotSharingGroup(sharingGroup);
        consumer.setInvokableClass(SpeedTestConsumer.class);
        consumer.setParallelism(numSubtasks);
        consumer.getConfiguration().set(ConfigurationUtils.getBooleanConfigOption((String)IS_SLOW_RECEIVER_CONFIG_KEY), (Object)isSlowReceiver);
        jobVertices.add(consumer);
        if (useForwarder) {
            forwarder.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            consumer.connectNewDataSetAsInput(forwarder, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        } else {
            consumer.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        }
        return JobGraphTestUtils.streamingJobGraph((JobVertex[])jobVertices.toArray(new JobVertex[0]));
    }

    public static void main(String[] args) throws Exception {
        new NetworkStackThroughputITCase().testThroughput();
        System.out.println("Done.");
    }

    public static class SpeedTestRecord
    implements IOReadableWritable {
        private static final int RECORD_SIZE = 128;
        private final byte[] buf = new byte[128];

        public SpeedTestRecord() {
            for (int i = 0; i < 128; ++i) {
                this.buf[i] = (byte)(i % 128);
            }
        }

        public void write(DataOutputView out) throws IOException {
            out.write(this.buf);
        }

        public void read(DataInputView in) throws IOException {
            in.readFully(this.buf);
        }
    }

    public static class SpeedTestConsumer
    extends AbstractInvokable {
        public SpeedTestConsumer(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            RecordReader reader = new RecordReader((InputGate)this.getEnvironment().getInputGate(0), SpeedTestRecord.class, this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            try {
                boolean isSlow = (Boolean)this.getTaskConfiguration().get(ConfigurationUtils.getBooleanConfigOption((String)NetworkStackThroughputITCase.IS_SLOW_RECEIVER_CONFIG_KEY), (Object)false);
                int numRecords = 0;
                while (reader.next() != null) {
                    if (!isSlow || numRecords++ % 512 != 0) continue;
                    Thread.sleep(10L);
                }
            }
            finally {
                reader.clearBuffers();
            }
        }
    }

    public static class SpeedTestForwarder
    extends AbstractInvokable {
        public SpeedTestForwarder(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            RecordReader reader = new RecordReader((InputGate)this.getEnvironment().getInputGate(0), SpeedTestRecord.class, this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            RecordWriter writer = new RecordWriterBuilder().build(this.getEnvironment().getWriter(0));
            try {
                SpeedTestRecord record;
                while ((record = (SpeedTestRecord)reader.next()) != null) {
                    writer.emit((IOReadableWritable)record);
                }
            }
            finally {
                reader.clearBuffers();
                writer.close();
                writer.flushAll();
            }
        }
    }

    public static class SpeedTestProducer
    extends AbstractInvokable {
        public SpeedTestProducer(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            RecordWriter writer = new RecordWriterBuilder().build(this.getEnvironment().getWriter(0));
            try {
                int dataVolumeGb = (Integer)this.getTaskConfiguration().get(ConfigurationUtils.getIntConfigOption((String)NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY), (Object)1);
                long dataMbPerSubtask = dataVolumeGb * 10 / this.getCurrentNumberOfSubtasks();
                long numRecordsToEmit = dataMbPerSubtask * 1024L * 1024L / 128L;
                LOG.info(String.format("%d/%d: Producing %d records (each record: %d bytes, total: %.2f GB)", this.getIndexInSubtaskGroup() + 1, this.getCurrentNumberOfSubtasks(), numRecordsToEmit, 128, (double)dataMbPerSubtask / 1024.0));
                boolean isSlow = (Boolean)this.getTaskConfiguration().get(ConfigurationUtils.getBooleanConfigOption((String)NetworkStackThroughputITCase.IS_SLOW_SENDER_CONFIG_KEY), (Object)false);
                int numRecords = 0;
                SpeedTestRecord record = new SpeedTestRecord();
                for (long i = 0L; i < numRecordsToEmit; ++i) {
                    if (isSlow && numRecords++ % 512 == 0) {
                        Thread.sleep(10L);
                    }
                    writer.emit((IOReadableWritable)record);
                }
            }
            finally {
                writer.close();
                writer.flushAll();
            }
        }
    }
}

