/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight;

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.ActionType;
import org.apache.arrow.flight.AsyncPutListener;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestLargeMessage {
    @Test
    public void getLargeMessage() throws Exception {
        try (RootAllocator a = new RootAllocator(Long.MAX_VALUE);
             Producer producer = new Producer((BufferAllocator)a);
             FlightServer s = FlightServer.builder((BufferAllocator)a, (Location)Location.forGrpcInsecure((String)"localhost", (int)0), (FlightProducer)producer).build().start();
             FlightClient client = FlightClient.builder((BufferAllocator)a, (Location)s.getLocation()).build();
             FlightStream stream = client.getStream(new Ticket(new byte[0]), new CallOption[0]);
             VectorSchemaRoot root = stream.getRoot();){
            while (stream.next()) {
                for (Field field : root.getSchema().getFields()) {
                    int value = 0;
                    IntVector iv = (IntVector)root.getVector(field.getName());
                    for (int i = 0; i < root.getRowCount(); ++i) {
                        Assertions.assertEquals((int)value, (int)iv.get(i));
                        ++value;
                    }
                }
            }
        }
    }

    @Test
    public void putLargeMessage() throws Exception {
        try (RootAllocator a = new RootAllocator(Long.MAX_VALUE);
             Producer producer = new Producer((BufferAllocator)a);
             FlightServer s = FlightServer.builder((BufferAllocator)a, (Location)Location.forGrpcInsecure((String)"localhost", (int)0), (FlightProducer)producer).build().start();
             FlightClient client = FlightClient.builder((BufferAllocator)a, (Location)s.getLocation()).build();
             BufferAllocator testAllocator = a.newChildAllocator("testcase", 0L, Long.MAX_VALUE);
             VectorSchemaRoot root = TestLargeMessage.generateData(testAllocator);){
            FlightClient.ClientStreamListener listener = client.startPut(FlightDescriptor.path((String[])new String[]{"hello"}), root, (FlightClient.PutListener)new AsyncPutListener(), new CallOption[0]);
            listener.putNext();
            listener.completed();
            listener.getResult();
        }
    }

    private static VectorSchemaRoot generateData(BufferAllocator allocator) {
        int size = 131072;
        List<String> fieldNames = Arrays.asList("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10");
        Stream<Field> fields = fieldNames.stream().map(fieldName -> new Field(fieldName, FieldType.nullable((ArrowType)new ArrowType.Int(32, true)), null));
        Schema schema = new Schema((Iterable)fields.collect(ImmutableList.toImmutableList()), null);
        VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)allocator);
        root.allocateNew();
        for (String fieldName2 : fieldNames) {
            IntVector iv = (IntVector)root.getVector(fieldName2);
            iv.setValueCount(131072);
            for (int i = 0; i < 131072; ++i) {
                iv.set(i, i);
            }
        }
        root.setRowCount(131072);
        return root;
    }

    private static class Producer
    implements FlightProducer,
    AutoCloseable {
        private final BufferAllocator allocator;

        Producer(BufferAllocator allocator) {
            this.allocator = allocator;
        }

        public void getStream(FlightProducer.CallContext context, Ticket ticket, FlightProducer.ServerStreamListener listener) {
            try (VectorSchemaRoot root = TestLargeMessage.generateData(this.allocator);){
                listener.start(root);
                listener.putNext();
                listener.completed();
            }
        }

        public void listFlights(FlightProducer.CallContext context, Criteria criteria, FlightProducer.StreamListener<FlightInfo> listener) {
        }

        public FlightInfo getFlightInfo(FlightProducer.CallContext context, FlightDescriptor descriptor) {
            return null;
        }

        public Runnable acceptPut(FlightProducer.CallContext context, FlightStream flightStream, FlightProducer.StreamListener<PutResult> ackStream) {
            return () -> {
                try (VectorSchemaRoot root = flightStream.getRoot();){
                    while (flightStream.next()) {
                    }
                }
            };
        }

        public void doAction(FlightProducer.CallContext context, Action action, FlightProducer.StreamListener<Result> listener) {
            listener.onCompleted();
        }

        public void listActions(FlightProducer.CallContext context, FlightProducer.StreamListener<ActionType> listener) {
        }

        @Override
        public void close() throws Exception {
            this.allocator.close();
        }
    }
}

