/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.arrow.flight.AsyncPutListener;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.FlightTestUtil;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.SyncPutListener;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TestApplicationMetadata {
    private static final byte[] COMMAND_ARROW_6136 = "ARROW-6136".getBytes(StandardCharsets.UTF_8);
    private static final String MESSAGE_ARROW_6136 = "The stream should not be double-closed.";

    @Test
    @Disabled
    public void retrieveMetadata() {
        this.test((allocator, client) -> {
            try (FlightStream stream = client.getStream(new Ticket(new byte[0]), new CallOption[0]);){
                byte i = 0;
                while (stream.next()) {
                    IntVector vector = (IntVector)stream.getRoot().getVector("a");
                    Assertions.assertEquals((int)1, (int)vector.getValueCount());
                    Assertions.assertEquals((int)10, (int)vector.get(0));
                    Assertions.assertEquals((byte)i, (byte)stream.getLatestMetadata().getByte(0L));
                    i = (byte)(i + 1);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    public void arrow6136() {
        Schema schema = new Schema(Collections.emptyList());
        this.test((allocator, client) -> {
            try (VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)allocator);){
                FlightDescriptor descriptor = FlightDescriptor.command((byte[])COMMAND_ARROW_6136);
                SyncPutListener listener = new SyncPutListener();
                FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, (FlightClient.PutListener)listener, new CallOption[0]);
                CallStatus status = FlightTestUtil.assertCode(FlightStatusCode.INTERNAL, () -> ((FlightClient.ClientStreamListener)writer).getResult());
                Assertions.assertEquals((Object)MESSAGE_ARROW_6136, (Object)status.description());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    @Disabled
    public void uploadMetadataAsync() {
        Schema schema = new Schema(Collections.singletonList(Field.nullable((String)"a", (ArrowType)new ArrowType.Int(32, true))));
        this.test((allocator, client) -> {
            try (VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)allocator);){
                FlightDescriptor descriptor = FlightDescriptor.path((String[])new String[]{"test"});
                AsyncPutListener listener = new AsyncPutListener(){
                    int counter = 0;

                    public void onNext(PutResult val) {
                        Assertions.assertNotNull((Object)val);
                        Assertions.assertEquals((int)this.counter, (int)val.getApplicationMetadata().getByte(0L));
                        ++this.counter;
                    }
                };
                FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, (FlightClient.PutListener)listener, new CallOption[0]);
                root.allocateNew();
                for (byte i = 0; i < 10; i = (byte)(i + 1)) {
                    IntVector vector = (IntVector)root.getVector("a");
                    ArrowBuf metadata = allocator.buffer(1L);
                    metadata.writeByte(i);
                    vector.set(0, 10);
                    vector.setValueCount(1);
                    root.setRowCount(1);
                    writer.putNext(metadata);
                }
                writer.completed();
                writer.getResult();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Test
    @Disabled
    public void uploadMetadataSync() {
        Schema schema = new Schema(Collections.singletonList(Field.nullable((String)"a", (ArrowType)new ArrowType.Int(32, true))));
        this.test((allocator, client) -> {
            try (VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)allocator);
                 SyncPutListener listener = new SyncPutListener();){
                FlightDescriptor descriptor = FlightDescriptor.path((String[])new String[]{"test"});
                FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, (FlightClient.PutListener)listener, new CallOption[0]);
                root.allocateNew();
                for (byte i = 0; i < 10; i = (byte)(i + 1)) {
                    IntVector vector = (IntVector)root.getVector("a");
                    ArrowBuf metadata = allocator.buffer(1L);
                    metadata.writeByte(i);
                    vector.set(0, 10);
                    vector.setValueCount(1);
                    root.setRowCount(1);
                    writer.putNext(metadata);
                    try (PutResult message = listener.poll(5000L, TimeUnit.SECONDS);){
                        Assertions.assertNotNull((Object)message);
                        Assertions.assertEquals((byte)i, (byte)message.getApplicationMetadata().getByte(0L));
                        continue;
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
                writer.completed();
                writer.getResult();
            }
        });
    }

    @Test
    @Disabled
    public void syncMemoryReclaimed() {
        Schema schema = new Schema(Collections.singletonList(Field.nullable((String)"a", (ArrowType)new ArrowType.Int(32, true))));
        this.test((allocator, client) -> {
            try (VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)allocator);
                 SyncPutListener listener = new SyncPutListener();){
                FlightDescriptor descriptor = FlightDescriptor.path((String[])new String[]{"test"});
                FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, (FlightClient.PutListener)listener, new CallOption[0]);
                root.allocateNew();
                for (byte i = 0; i < 10; i = (byte)(i + 1)) {
                    IntVector vector = (IntVector)root.getVector("a");
                    ArrowBuf metadata = allocator.buffer(1L);
                    metadata.writeByte(i);
                    vector.set(0, 10);
                    vector.setValueCount(1);
                    root.setRowCount(1);
                    writer.putNext(metadata);
                }
                writer.completed();
                writer.getResult();
            }
        });
    }

    @Test
    public void testMetadataEndianness() throws Exception {
        try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
             BufferAllocator serverAllocator = allocator.newChildAllocator("flight-server", 0L, Long.MAX_VALUE);
             FlightServer server = FlightServer.builder((BufferAllocator)serverAllocator, (Location)Location.forGrpcInsecure((String)"localhost", (int)0), (FlightProducer)new EndianFlightProducer(serverAllocator)).build().start();
             FlightClient client = FlightClient.builder((BufferAllocator)allocator, (Location)server.getLocation()).build();){
            Schema schema = new Schema(Collections.emptyList());
            FlightDescriptor descriptor = FlightDescriptor.command((byte[])new byte[0]);
            try (SyncPutListener reader = new SyncPutListener();
                 VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)allocator);){
                FlightClient.ClientStreamListener writer = client.startPut(descriptor, root, (FlightClient.PutListener)reader, new CallOption[0]);
                writer.completed();
                try (PutResult metadata = reader.read();){
                    Assertions.assertEquals((long)16L, (long)metadata.getApplicationMetadata().readableBytes());
                    byte[] bytes = new byte[16];
                    metadata.getApplicationMetadata().readBytes(bytes);
                    Assertions.assertArrayEquals((byte[])EndianFlightProducer.EXPECTED_BYTES, (byte[])bytes);
                }
                writer.getResult();
            }
        }
    }

    private void test(BiConsumer<BufferAllocator, FlightClient> fun) {
        try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
             FlightServer s = FlightServer.builder((BufferAllocator)allocator, (Location)Location.forGrpcInsecure((String)"localhost", (int)0), (FlightProducer)new MetadataFlightProducer((BufferAllocator)allocator)).build().start();
             FlightClient client = FlightClient.builder((BufferAllocator)allocator, (Location)s.getLocation()).build();){
            fun.accept((BufferAllocator)allocator, client);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static class EndianFlightProducer
    extends NoOpFlightProducer {
        static final byte[] EXPECTED_BYTES = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
        private final BufferAllocator allocator;

        private EndianFlightProducer(BufferAllocator allocator) {
            this.allocator = allocator;
        }

        public Runnable acceptPut(FlightProducer.CallContext context, FlightStream flightStream, FlightProducer.StreamListener<PutResult> ackStream) {
            return () -> {
                while (flightStream.next()) {
                }
                try (ArrowBuf buf = this.allocator.buffer(16L);){
                    buf.writeBytes(EXPECTED_BYTES);
                    ackStream.onNext((Object)PutResult.metadata((ArrowBuf)buf));
                }
                ackStream.onCompleted();
            };
        }
    }

    private static class MetadataFlightProducer
    extends NoOpFlightProducer {
        private final BufferAllocator allocator;

        public MetadataFlightProducer(BufferAllocator allocator) {
            this.allocator = allocator;
        }

        public void getStream(FlightProducer.CallContext context, Ticket ticket, FlightProducer.ServerStreamListener listener) {
            Schema schema = new Schema(Collections.singletonList(Field.nullable((String)"a", (ArrowType)new ArrowType.Int(32, true))));
            try (VectorSchemaRoot root = VectorSchemaRoot.create((Schema)schema, (BufferAllocator)this.allocator);){
                root.allocateNew();
                listener.start(root);
                for (byte i = 0; i < 10; i = (byte)(i + 1)) {
                    IntVector vector = (IntVector)root.getVector("a");
                    vector.set(0, 10);
                    vector.setValueCount(1);
                    root.setRowCount(1);
                    ArrowBuf metadata = this.allocator.buffer(1L);
                    metadata.writeByte(i);
                    listener.putNext(metadata);
                }
                listener.completed();
            }
        }

        public Runnable acceptPut(FlightProducer.CallContext context, FlightStream stream, FlightProducer.StreamListener<PutResult> ackStream) {
            return () -> {
                stream.getRoot();
                if (stream.getDescriptor().isCommand() && Arrays.equals(stream.getDescriptor().getCommand(), COMMAND_ARROW_6136)) {
                    ackStream.onError((Throwable)CallStatus.INTERNAL.withDescription(TestApplicationMetadata.MESSAGE_ARROW_6136).toRuntimeException());
                    return;
                }
                try {
                    byte current = 0;
                    while (stream.next()) {
                        ArrowBuf metadata = stream.getLatestMetadata();
                        if (current != metadata.getByte(0L)) {
                            ackStream.onError((Throwable)CallStatus.INVALID_ARGUMENT.withDescription(String.format("Metadata does not match expected value; got %d but expected %d.", metadata.getByte(0L), current)).toRuntimeException());
                            return;
                        }
                        ackStream.onNext((Object)PutResult.metadata((ArrowBuf)metadata));
                        current = (byte)(current + 1);
                    }
                    if (current != 10) {
                        throw CallStatus.INVALID_ARGUMENT.withDescription("Wrong number of messages sent.").toRuntimeException();
                    }
                }
                catch (Exception e) {
                    throw CallStatus.INTERNAL.withCause((Throwable)e).withDescription(e.toString()).toRuntimeException();
                }
            };
        }
    }
}

