/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.flight;

import com.google.common.collect.ImmutableList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.arrow.flight.BackpressureStrategy;
import org.apache.arrow.flight.CallOption;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.flight.perf.PerformanceTestServer;
import org.apache.arrow.flight.perf.TestPerf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public class TestBackPressure {
    private static final int BATCH_SIZE = 4095;

    @Disabled
    @Test
    public void ensureIndependentSteams() throws Exception {
        TestBackPressure.ensureIndependentSteams(b -> location -> new PerformanceTestServer((BufferAllocator)b, (Location)location));
    }

    @Disabled
    @Test
    public void ensureIndependentSteamsWithCallbacks() throws Exception {
        TestBackPressure.ensureIndependentSteams(b -> location -> new PerformanceTestServer((BufferAllocator)b, (Location)location, (BackpressureStrategy)new BackpressureStrategy.CallbackBackpressureStrategy(), true));
    }

    @Disabled
    @Test
    public void ensureWaitUntilProceed() throws Exception {
        TestBackPressure.ensureWaitUntilProceed(new PollingBackpressureStrategy(), false);
    }

    @Disabled
    @Test
    public void ensureWaitUntilProceedWithCallbacks() throws Exception {
        TestBackPressure.ensureWaitUntilProceed(new RecordingCallbackBackpressureStrategy(), true);
    }

    private static void ensureIndependentSteams(Function<BufferAllocator, Function<Location, PerformanceTestServer>> serverConstructor) throws Exception {
        try (RootAllocator a = new RootAllocator(Long.MAX_VALUE);
             PerformanceTestServer server = serverConstructor.apply((BufferAllocator)a).apply(Location.forGrpcInsecure((String)"localhost", (int)0)).start();
             FlightClient client = FlightClient.builder((BufferAllocator)a, (Location)server.getLocation()).build();
             FlightStream fs1 = client.getStream(((FlightEndpoint)client.getInfo(TestPerf.getPerfFlightDescriptor(450450L, 4095, 1), new CallOption[0]).getEndpoints().get(0)).getTicket(), new CallOption[0]);){
            TestBackPressure.consume(fs1, 10);
            try (FlightStream fs2 = client.getStream(((FlightEndpoint)client.getInfo(TestPerf.getPerfFlightDescriptor(819000L, 4095, 1), new CallOption[0]).getEndpoints().get(0)).getTicket(), new CallOption[0]);){
                TestBackPressure.consume(fs2, 100);
                TestBackPressure.consume(fs1, 100);
                TestBackPressure.consume(fs2, 100);
                TestBackPressure.consume(fs1);
                TestBackPressure.consume(fs2);
            }
        }
    }

    private static void ensureWaitUntilProceed(final SleepTimeRecordingBackpressureStrategy bpStrategy, boolean isNonBlocking) throws Exception {
        long wait = 3000L;
        long epsilon = 1000L;
        try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);){
            NoOpFlightProducer producer = new NoOpFlightProducer((BufferAllocator)allocator, isNonBlocking){
                final /* synthetic */ BufferAllocator val$allocator;
                final /* synthetic */ boolean val$isNonBlocking;
                {
                    this.val$allocator = bufferAllocator;
                    this.val$isNonBlocking = bl;
                }

                public void getStream(FlightProducer.CallContext context, Ticket ticket, FlightProducer.ServerStreamListener listener) {
                    bpStrategy.register(listener);
                    Runnable loadData = () -> {
                        int batches = 0;
                        Schema pojoSchema = new Schema((Iterable)ImmutableList.of((Object)Field.nullable((String)"a", (ArrowType)Types.MinorType.BIGINT.getType())));
                        try (VectorSchemaRoot root = VectorSchemaRoot.create((Schema)pojoSchema, (BufferAllocator)this.val$allocator);){
                            listener.start(root);
                            while (true) {
                                bpStrategy.waitForListener(0L);
                                if (batches > 100) {
                                    root.clear();
                                    listener.completed();
                                    return;
                                }
                                root.allocateNew();
                                root.setRowCount(4095);
                                listener.putNext();
                                ++batches;
                            }
                        }
                    };
                    if (!this.val$isNonBlocking) {
                        loadData.run();
                    } else {
                        ExecutorService service = Executors.newSingleThreadExecutor();
                        Future<?> unused = service.submit(loadData);
                        service.shutdown();
                    }
                }
            };
            try (BufferAllocator serverAllocator = allocator.newChildAllocator("server", 0L, Long.MAX_VALUE);
                 FlightServer server = FlightServer.builder((BufferAllocator)serverAllocator, (Location)Location.forGrpcInsecure((String)"localhost", (int)0), (FlightProducer)producer).build().start();
                 BufferAllocator clientAllocator = allocator.newChildAllocator("client", 0L, Long.MAX_VALUE);
                 FlightClient client = FlightClient.builder((BufferAllocator)clientAllocator, (Location)server.getLocation()).build();
                 FlightStream stream = client.getStream(new Ticket(new byte[1]), new CallOption[0]);){
                VectorSchemaRoot root = stream.getRoot();
                root.clear();
                Thread.sleep(3000L);
                while (stream.next()) {
                    root.clear();
                }
                long expected = 2000L;
                Assertions.assertTrue((bpStrategy.getSleepTime() > expected ? 1 : 0) != 0, (String)String.format("Expected a sleep of at least %dms but only slept for %d", expected, bpStrategy.getSleepTime()));
            }
        }
    }

    private static void consume(FlightStream stream) {
        VectorSchemaRoot root = stream.getRoot();
        while (stream.next()) {
            root.clear();
        }
    }

    private static void consume(FlightStream stream, int batches) {
        VectorSchemaRoot root = stream.getRoot();
        while (batches > 0 && stream.next()) {
            root.clear();
            --batches;
        }
    }

    private static class PollingBackpressureStrategy
    implements SleepTimeRecordingBackpressureStrategy {
        private final AtomicLong sleepTime = new AtomicLong(0L);
        private FlightProducer.ServerStreamListener listener;

        private PollingBackpressureStrategy() {
        }

        @Override
        public long getSleepTime() {
            return this.sleepTime.get();
        }

        public void register(FlightProducer.ServerStreamListener listener) {
            this.listener = listener;
        }

        public BackpressureStrategy.WaitResult waitForListener(long timeout) {
            while (!this.listener.isReady()) {
                try {
                    Thread.sleep(1L);
                    this.sleepTime.addAndGet(1L);
                }
                catch (InterruptedException interruptedException) {}
            }
            return BackpressureStrategy.WaitResult.READY;
        }
    }

    private static interface SleepTimeRecordingBackpressureStrategy
    extends BackpressureStrategy {
        public long getSleepTime();
    }

    private static class RecordingCallbackBackpressureStrategy
    extends BackpressureStrategy.CallbackBackpressureStrategy
    implements SleepTimeRecordingBackpressureStrategy {
        private final AtomicLong sleepTime = new AtomicLong(0L);

        private RecordingCallbackBackpressureStrategy() {
        }

        @Override
        public long getSleepTime() {
            return this.sleepTime.get();
        }

        public BackpressureStrategy.WaitResult waitForListener(long timeout) {
            long startTime = System.currentTimeMillis();
            BackpressureStrategy.WaitResult result = super.waitForListener(timeout);
            this.sleepTime.addAndGet(System.currentTimeMillis() - startTime);
            return result;
        }
    }
}

