/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight.perf;

import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.perf.PerformanceTestServer;
import org.apache.arrow.flight.perf.impl.PerfOuterClass;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

@Disabled
public class TestPerf {
    public static final boolean VALIDATE = false;

    public static FlightDescriptor getPerfFlightDescriptor(long recordCount, int recordsPerBatch, int streamCount) {
        Schema pojoSchema = new Schema((Iterable)ImmutableList.of((Object)Field.nullable((String)"a", (ArrowType)Types.MinorType.BIGINT.getType()), (Object)Field.nullable((String)"b", (ArrowType)Types.MinorType.BIGINT.getType()), (Object)Field.nullable((String)"c", (ArrowType)Types.MinorType.BIGINT.getType()), (Object)Field.nullable((String)"d", (ArrowType)Types.MinorType.BIGINT.getType())));
        byte[] bytes = pojoSchema.serializeAsMessage();
        ByteString serializedSchema = ByteString.copyFrom((byte[])bytes);
        return FlightDescriptor.command((byte[])PerfOuterClass.Perf.newBuilder().setRecordsPerStream(recordCount).setRecordsPerBatch(recordsPerBatch).setSchema(serializedSchema).setStreamCount(streamCount).build().toByteArray());
    }

    public static void main(String[] args) throws Exception {
        new TestPerf().throughput();
    }

    @Test
    public void throughput() throws Exception {
        int numRuns = 10;
        ListeningExecutorService pool = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(4));
        double[] throughPuts = new double[10];
        for (int i = 0; i < 10; ++i) {
            try (RootAllocator a = new RootAllocator(Long.MAX_VALUE);
                 PerformanceTestServer server = new PerformanceTestServer((BufferAllocator)a, Location.forGrpcInsecure((String)"localhost", (int)0)).start();
                 FlightClient client = FlightClient.builder((BufferAllocator)a, (Location)server.getLocation()).build();){
                FlightInfo info = client.getInfo(TestPerf.getPerfFlightDescriptor(50000000L, 4095, 2), new CallOption[0]);
                List results = info.getEndpoints().stream().map(t -> new Consumer(client, t.getTicket())).map(t -> pool.submit((Callable)t)).collect(Collectors.toList());
                Result r = (Result)Futures.whenAllSucceed(results).call(() -> {
                    Result res = new Result();
                    for (ListenableFuture f : results) {
                        res.add((Result)f.get());
                    }
                    return res;
                }, (Executor)pool).get();
                double seconds = (double)r.nanos * 1.0 / 1000.0 / 1000.0 / 1000.0;
                throughPuts[i] = (double)r.bytes * 1.0 / 1024.0 / 1024.0 / seconds;
                System.out.printf("Transferred %d records totaling %s bytes at %f MiB/s. %f record/s. %f batch/s.%n", r.rows, r.bytes, throughPuts[i], (double)r.rows * 1.0 / seconds, (double)r.batches * 1.0 / seconds);
                continue;
            }
        }
        pool.shutdown();
        System.out.println("Summary: ");
        double average = Arrays.stream(throughPuts).sum() / 10.0;
        double sqrSum = Arrays.stream(throughPuts).map(val -> val - average).map(val -> val * val).sum();
        double stddev = Math.sqrt(sqrSum / 10.0);
        System.out.printf("Average throughput: %f MiB/s, standard deviation: %f MiB/s%n", average, stddev);
    }

    private static final class Result {
        private long rows;
        private long aSum;
        private long bytes;
        private long nanos;
        private long batches;

        private Result() {
        }

        public void add(Result r) {
            this.rows += r.rows;
            this.aSum += r.aSum;
            this.bytes += r.bytes;
            this.batches += r.batches;
            this.nanos = Math.max(this.nanos, r.nanos);
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("rows", this.rows).add("aSum", this.aSum).add("batches", this.batches).add("bytes", this.bytes).add("nanos", this.nanos).toString();
        }
    }

    private static final class Consumer
    implements Callable<Result> {
        private final FlightClient client;
        private final Ticket ticket;

        public Consumer(FlightClient client, Ticket ticket) {
            this.client = client;
            this.ticket = ticket;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Result call() throws Exception {
            Result r = new Result();
            Stopwatch watch = Stopwatch.createStarted();
            try (FlightStream stream = this.client.getStream(this.ticket, new CallOption[0]);){
                Result result;
                VectorSchemaRoot root = stream.getRoot();
                try {
                    BigIntVector a = (BigIntVector)root.getVector("a");
                    while (stream.next()) {
                        int rows = root.getRowCount();
                        long aSum = r.aSum;
                        for (int i = 0; i < rows; ++i) {
                        }
                        r.bytes += (long)rows * 32L;
                        r.rows += (long)rows;
                        r.aSum = aSum;
                        ++r.batches;
                    }
                    r.nanos = watch.elapsed(TimeUnit.NANOSECONDS);
                    result = r;
                }
                catch (Throwable throwable) {
                    root.clear();
                    throw throwable;
                }
                root.clear();
                return result;
            }
        }
    }
}

