/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight.perf;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.arrow.flight.BackpressureStrategy;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.perf.impl.PerfOuterClass;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

public class PerformanceTestServer
implements AutoCloseable {
    private final FlightServer flightServer;
    private final BufferAllocator allocator;
    private final PerfProducer producer;
    private final boolean isNonBlocking;

    public PerformanceTestServer(BufferAllocator incomingAllocator, Location location) {
        this(incomingAllocator, location, new BackpressureStrategy(){
            private FlightProducer.ServerStreamListener listener;

            public void register(FlightProducer.ServerStreamListener listener) {
                this.listener = listener;
            }

            public BackpressureStrategy.WaitResult waitForListener(long timeout) {
                while (!this.listener.isReady() && !this.listener.isCancelled()) {
                }
                return BackpressureStrategy.WaitResult.READY;
            }
        }, false);
    }

    public PerformanceTestServer(BufferAllocator incomingAllocator, Location location, BackpressureStrategy bpStrategy, boolean isNonBlocking) {
        this.allocator = incomingAllocator.newChildAllocator("perf-server", 0L, Long.MAX_VALUE);
        this.producer = new PerfProducer(bpStrategy);
        this.flightServer = FlightServer.builder((BufferAllocator)this.allocator, (Location)location, (FlightProducer)this.producer).build();
        this.isNonBlocking = isNonBlocking;
    }

    public Location getLocation() {
        return this.flightServer.getLocation();
    }

    public PerformanceTestServer start() throws IOException {
        this.flightServer.start();
        return this;
    }

    @Override
    public void close() throws Exception {
        AutoCloseables.close((AutoCloseable[])new AutoCloseable[]{this.flightServer, this.allocator});
    }

    private final class PerfProducer
    extends NoOpFlightProducer {
        private final BackpressureStrategy bpStrategy;

        private PerfProducer(BackpressureStrategy bpStrategy) {
            this.bpStrategy = bpStrategy;
        }

        public void getStream(FlightProducer.CallContext context, Ticket ticket, FlightProducer.ServerStreamListener listener) {
            this.bpStrategy.register(listener);
            Runnable loadData = () -> {
                PerfOuterClass.Token token = null;
                try {
                    token = PerfOuterClass.Token.parseFrom(ticket.getBytes());
                }
                catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException(e);
                }
                PerfOuterClass.Perf perf = token.getDefinition();
                Schema schema = Schema.deserializeMessage((ByteBuffer)perf.getSchema().asReadOnlyByteBuffer());
                try (VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)PerformanceTestServer.this.allocator);
                     BigIntVector a = (BigIntVector)root.getVector("a");){
                    listener.setUseZeroCopy(true);
                    listener.start(root);
                    root.allocateNew();
                    int current = 0;
                    long i = token.getStart();
                    while (i < token.getEnd()) {
                        if (listener.isCancelled()) {
                            root.clear();
                            return;
                        }
                        ++current;
                        if (++i % (long)perf.getRecordsPerBatch() != 0L) continue;
                        root.setRowCount(current);
                        this.bpStrategy.waitForListener(0L);
                        if (listener.isCancelled()) {
                            root.clear();
                            return;
                        }
                        listener.putNext();
                        current = 0;
                        root.allocateNew();
                    }
                    if (current != 0) {
                        root.setRowCount(current);
                        listener.putNext();
                    }
                    listener.completed();
                    return;
                }
            };
            if (!PerformanceTestServer.this.isNonBlocking) {
                loadData.run();
            } else {
                ExecutorService service = Executors.newSingleThreadExecutor();
                Future<?> unused = service.submit(loadData);
                service.shutdown();
            }
        }

        public FlightInfo getFlightInfo(FlightProducer.CallContext context, FlightDescriptor descriptor) {
            try {
                Preconditions.checkArgument((boolean)descriptor.isCommand());
                PerfOuterClass.Perf exec = PerfOuterClass.Perf.parseFrom(descriptor.getCommand());
                Schema pojoSchema = new Schema((Iterable)ImmutableList.of((Object)Field.nullable((String)"a", (ArrowType)Types.MinorType.BIGINT.getType()), (Object)Field.nullable((String)"b", (ArrowType)Types.MinorType.BIGINT.getType()), (Object)Field.nullable((String)"c", (ArrowType)Types.MinorType.BIGINT.getType()), (Object)Field.nullable((String)"d", (ArrowType)Types.MinorType.BIGINT.getType())));
                PerfOuterClass.Token token = PerfOuterClass.Token.newBuilder().setDefinition(exec).setStart(0L).setEnd(exec.getRecordsPerStream()).build();
                Ticket ticket = new Ticket(token.toByteArray());
                ArrayList<FlightEndpoint> endpoints = new ArrayList<FlightEndpoint>();
                for (int i = 0; i < exec.getStreamCount(); ++i) {
                    endpoints.add(new FlightEndpoint(ticket, new Location[]{PerformanceTestServer.this.getLocation()}));
                }
                return new FlightInfo(pojoSchema, descriptor, endpoints, -1L, exec.getRecordsPerStream() * (long)exec.getStreamCount());
            }
            catch (InvalidProtocolBufferException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

