package org.apache.drill.exec.server;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import mockit.Injectable;
import mockit.NonStrictExpectations;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.AckSender;
import org.apache.drill.exec.rpc.data.DataConnectionManager;
import org.apache.drill.exec.rpc.data.DataResponseHandler;
import org.apache.drill.exec.rpc.data.DataServer;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/server/TestBitRpc.class */
public class TestBitRpc extends ExecTest {
    static final Logger logger = LoggerFactory.getLogger(TestBitRpc.class);

    /* loaded from: input_file:org/apache/drill/exec/server/TestBitRpc$BitComTestHandler.class */
    private class BitComTestHandler implements DataResponseHandler {
        int v;

        private BitComTestHandler() {
            this.v = 0;
        }

        public void informOutOfMemory() {
        }

        public void handle(FragmentManager fragmentManager, BitData.FragmentRecordBatch fragmentRecordBatch, DrillBuf drillBuf, AckSender ackSender) throws FragmentSetupException, IOException {
            try {
                this.v++;
                if (this.v % 10 == 0) {
                    System.out.println("sleeping.");
                    Thread.sleep(3000L);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ackSender.sendOk();
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/server/TestBitRpc$TimingOutcome.class */
    private class TimingOutcome implements RpcOutcomeListener<GeneralRPCProtos.Ack> {
        private AtomicLong max;
        private Stopwatch watch = new Stopwatch().start();

        public TimingOutcome(AtomicLong atomicLong) {
            this.max = atomicLong;
        }

        public void failed(RpcException rpcException) {
            rpcException.printStackTrace();
        }

        public void success(GeneralRPCProtos.Ack ack, ByteBuf byteBuf) {
            long j;
            long elapsed = this.watch.elapsed(TimeUnit.MILLISECONDS);
            System.out.println(String.format("Total time to send: %d, start time %d", Long.valueOf(elapsed), Long.valueOf(System.currentTimeMillis() - elapsed)));
            do {
                j = this.max.get();
                if (j >= elapsed) {
                    return;
                }
            } while (!this.max.compareAndSet(j, elapsed));
        }

        public void interrupted(InterruptedException interruptedException) {
        }
    }

    @Test
    public void testConnectionBackpressure(@Injectable WorkManager.WorkerBee workerBee, @Injectable final WorkEventBus workEventBus, @Injectable final FragmentManager fragmentManager, @Injectable final FragmentContext fragmentContext) throws Exception {
        DrillConfig create = DrillConfig.create();
        final BootStrapContext bootStrapContext = new BootStrapContext(create, ClassPathScanner.fromPrescan(create));
        DrillConfig create2 = DrillConfig.create();
        BootStrapContext bootStrapContext2 = new BootStrapContext(create2, ClassPathScanner.fromPrescan(create2));
        new NonStrictExpectations() { // from class: org.apache.drill.exec.server.TestBitRpc.1
            {
                workEventBus.getFragmentManagerIfExists((ExecProtos.FragmentHandle) any);
                result = fragmentManager;
                workEventBus.getFragmentManager((ExecProtos.FragmentHandle) any);
                result = fragmentManager;
                fragmentManager.getFragmentContext();
                result = fragmentContext;
                fragmentContext.getAllocator();
                result = bootStrapContext.getAllocator();
            }
        };
        DataTunnel dataTunnel = new DataTunnel(new DataConnectionManager(CoordinationProtos.DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(new DataServer(bootStrapContext, workEventBus, new BitComTestHandler()).bind(1234, true)).build(), bootStrapContext2));
        AtomicLong atomicLong = new AtomicLong(0L);
        for (int i = 0; i < 40; i++) {
            long currentTimeMillis = System.currentTimeMillis();
            dataTunnel.sendRecordBatch(new TimingOutcome(atomicLong), new FragmentWritableBatch(false, UserBitShared.QueryId.getDefaultInstance(), 1, 1, 1, 1, getRandomBatch(bootStrapContext.getAllocator(), 5000)));
            System.out.println(System.currentTimeMillis() - currentTimeMillis);
        }
        System.out.println(String.format("Max time: %d", Long.valueOf(atomicLong.get())));
        Assert.assertTrue(atomicLong.get() > 2700);
        Thread.sleep(5000L);
    }

    private static WritableBatch getRandomBatch(BufferAllocator bufferAllocator, int i) {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < 5; i2++) {
            Float8Vector newVector = TypeHelper.getNewVector(MaterializedField.create(new SchemaPath("a", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.FLOAT8)), bufferAllocator);
            newVector.allocateNew(i);
            newVector.getMutator().generateTestData(i);
            newArrayList.add(newVector);
        }
        return WritableBatch.getBatchNoHV(i, newArrayList, false);
    }
}
