package org.apache.drill.exec.server;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.math3.util.Pair;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.SingleRowListener;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.RepeatTestRule;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.physical.impl.ScreenCreator;
import org.apache.drill.exec.physical.impl.SingleSenderCreator;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.store.pojo.PojoRecordReader;
import org.apache.drill.exec.testing.Controls;
import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.ForemanException;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.test.DrillTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/server/TestDrillbitResilience.class */
public class TestDrillbitResilience extends DrillTest {
    private static ZookeeperHelper zkHelper;
    private static RemoteServiceSet remoteServiceSet;
    private static DrillClient drillClient;
    private static final int NUM_RUNS = 3;
    private static final String TEST_QUERY = "select * from sys.memory";
    private static final String DRILLBIT_ALPHA = "alpha";
    private static final String DRILLBIT_BETA = "beta";
    private static final String DRILLBIT_GAMMA = "gamma";
    private static final Logger logger = LoggerFactory.getLogger(TestDrillbitResilience.class);
    private static final Map<String, Drillbit> drillbits = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/server/TestDrillbitResilience$CancellingThread.class */
    public static class CancellingThread extends Thread {
        private final UserBitShared.QueryId queryId;
        private final Pointer<Exception> ex;
        private final ExtendedLatch latch;

        public CancellingThread(UserBitShared.QueryId queryId, Pointer<Exception> pointer, ExtendedLatch extendedLatch) {
            this.queryId = queryId;
            this.ex = pointer;
            this.latch = extendedLatch;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                TestDrillbitResilience.drillClient.cancelQuery(this.queryId).checkedGet();
            } catch (RpcException e) {
                this.ex.value = e;
            }
            if (this.latch != null) {
                this.latch.countDown();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/server/TestDrillbitResilience$ListenerThatCancelsQueryAfterFirstBatchOfData.class */
    public static class ListenerThatCancelsQueryAfterFirstBatchOfData extends WaitUntilCompleteListener {
        private boolean cancelRequested;

        private ListenerThatCancelsQueryAfterFirstBatchOfData() {
            super();
            this.cancelRequested = false;
        }

        @Override // org.apache.drill.exec.server.TestDrillbitResilience.WaitUntilCompleteListener
        public void dataArrived(QueryDataBatch queryDataBatch, ConnectionThrottle connectionThrottle) {
            if (!this.cancelRequested) {
                check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                new CancellingThread(this.queryId, this.ex, null).start();
                this.cancelRequested = true;
            }
            queryDataBatch.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/server/TestDrillbitResilience$ResumingThread.class */
    public static class ResumingThread extends Thread {
        private final UserBitShared.QueryId queryId;
        private final Pointer<Exception> ex;
        private final ExtendedLatch latch;

        public ResumingThread(UserBitShared.QueryId queryId, Pointer<Exception> pointer, ExtendedLatch extendedLatch) {
            this.queryId = queryId;
            this.ex = pointer;
            this.latch = extendedLatch;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.latch.awaitUninterruptibly();
            try {
                TestDrillbitResilience.drillClient.resumeQuery(this.queryId).checkedGet();
            } catch (RpcException e) {
                this.ex.value = e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/server/TestDrillbitResilience$WaitUntilCompleteListener.class */
    public static class WaitUntilCompleteListener implements UserResultsListener {
        private final ExtendedLatch latch;
        protected UserBitShared.QueryId queryId;
        protected volatile Pointer<Exception> ex;
        protected volatile UserBitShared.QueryResult.QueryState state;

        private WaitUntilCompleteListener() {
            this.latch = new ExtendedLatch(1);
            this.queryId = null;
            this.ex = new Pointer<>();
            this.state = null;
        }

        protected final void check(boolean z, String str, Object... objArr) {
            if (z) {
                return;
            }
            this.ex.value = new IllegalStateException(String.format(str, objArr));
        }

        protected final void cancelAndResume() {
            Preconditions.checkNotNull(this.queryId);
            ExtendedLatch extendedLatch = new ExtendedLatch(1);
            new CancellingThread(this.queryId, this.ex, extendedLatch).start();
            new ResumingThread(this.queryId, this.ex, extendedLatch).start();
        }

        public void queryIdArrived(UserBitShared.QueryId queryId) {
            this.queryId = queryId;
        }

        public void submissionFailed(UserException userException) {
            this.ex.value = userException;
            this.state = UserBitShared.QueryResult.QueryState.FAILED;
            this.latch.countDown();
        }

        public void queryCompleted(UserBitShared.QueryResult.QueryState queryState) {
            this.state = queryState;
            this.latch.countDown();
        }

        public void dataArrived(QueryDataBatch queryDataBatch, ConnectionThrottle connectionThrottle) {
            queryDataBatch.release();
        }

        public final Pair<UserBitShared.QueryResult.QueryState, Exception> waitForCompletion() {
            this.latch.awaitUninterruptibly();
            return new Pair<>(this.state, this.ex.value);
        }
    }

    private static void startDrillbit(String str, RemoteServiceSet remoteServiceSet2) {
        if (drillbits.containsKey(str)) {
            throw new IllegalStateException("Drillbit named \"" + str + "\" already exists");
        }
        try {
            drillbits.put(str, Drillbit.start(zkHelper.getConfig(), remoteServiceSet2));
        } catch (DrillbitStartupException e) {
            throw new RuntimeException("Failed to start Drillbit \"" + str + "\"", e);
        }
    }

    private static void stopDrillbit(String str) {
        Drillbit drillbit = drillbits.get(str);
        if (drillbit == null) {
            throw new IllegalStateException("No Drillbit named \"" + str + "\" found");
        }
        try {
            drillbit.close();
        } catch (Exception e) {
            String str2 = "Error shutting down Drillbit \"" + str + "\"";
            System.err.println(str2 + '.');
            logger.warn(str2, e);
        }
    }

    private static void stopAllDrillbits() {
        Iterator<String> it = drillbits.keySet().iterator();
        while (it.hasNext()) {
            stopDrillbit(it.next());
        }
        drillbits.clear();
    }

    private static CoordinationProtos.DrillbitEndpoint getEndpoint(String str) {
        Drillbit drillbit = drillbits.get(str);
        if (drillbit == null) {
            throw new IllegalStateException("No Drillbit named \"" + str + "\" found.");
        }
        return drillbit.getContext().getEndpoint();
    }

    @BeforeClass
    public static void startSomeDrillbits() throws Exception {
        System.setProperty("drill.exec.http.enabled", "false");
        zkHelper = new ZookeeperHelper(true);
        zkHelper.startZookeeper(1);
        remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
        startDrillbit(DRILLBIT_ALPHA, remoteServiceSet);
        startDrillbit(DRILLBIT_BETA, remoteServiceSet);
        startDrillbit(DRILLBIT_GAMMA, remoteServiceSet);
        drillClient = QueryTestUtil.createClient(zkHelper.getConfig(), remoteServiceSet, 1, null);
        clearAllInjections();
    }

    @AfterClass
    public static void shutdownAllDrillbits() {
        if (drillClient != null) {
            drillClient.close();
            drillClient = null;
        }
        stopAllDrillbits();
        if (remoteServiceSet != null) {
            AutoCloseables.close(remoteServiceSet, logger);
            remoteServiceSet = null;
        }
        zkHelper.stopZookeeper();
    }

    private static void clearAllInjections() {
        Preconditions.checkNotNull(drillClient);
        ControlsInjectionUtil.clearControls(drillClient);
    }

    private static void assertDrillbitsOk() {
        SingleRowListener singleRowListener = new SingleRowListener() { // from class: org.apache.drill.exec.server.TestDrillbitResilience.1
            private final BufferAllocator bufferAllocator = new TopLevelAllocator(TestDrillbitResilience.zkHelper.getConfig());
            private final RecordBatchLoader loader = new RecordBatchLoader(this.bufferAllocator);

            @Override // org.apache.drill.SingleRowListener
            public void rowArrived(QueryDataBatch queryDataBatch) {
                try {
                    this.loader.load(queryDataBatch.getHeader().getDef(), queryDataBatch.getData());
                } catch (SchemaChangeException e) {
                    Assert.fail(e.toString());
                }
                Assert.assertEquals(1L, this.loader.getRecordCount());
                BatchSchema schema = this.loader.getSchema();
                Assert.assertEquals(1L, schema.getFieldCount());
                Assert.assertEquals(TypeProtos.MinorType.BIGINT, schema.getColumn(0).getType().getMinorType());
                Assert.assertTrue(((VectorWrapper) this.loader.iterator().next()).getValueVector().getAccessor().getObject(0) instanceof Long);
                Assert.assertEquals(TestDrillbitResilience.drillbits.size(), ((Long) r0).intValue());
                this.loader.clear();
            }

            @Override // org.apache.drill.SingleRowListener
            public void cleanup() {
                this.bufferAllocator.close();
            }
        };
        try {
            QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, "select count(*) from sys.memory", singleRowListener);
            singleRowListener.waitForCompletion();
            UserBitShared.QueryResult.QueryState queryState = singleRowListener.getQueryState();
            Assert.assertTrue(String.format("QueryState should be COMPLETED (and not %s).", queryState), queryState == UserBitShared.QueryResult.QueryState.COMPLETED);
            Assert.assertTrue("There should not be any errors when checking if Drillbits are OK.", singleRowListener.getErrorList().isEmpty());
        } catch (Exception e) {
            throw new RuntimeException("Couldn't query active drillbits", e);
        }
    }

    @After
    public void checkDrillbits() {
        clearAllInjections();
        assertDrillbitsOk();
    }

    private static void setControls(String str) {
        ControlsInjectionUtil.setControls(drillClient, str);
    }

    private static void setSessionOption(String str, String str2) {
        ControlsInjectionUtil.setSessionOption(drillClient, str, str2);
    }

    private static void assertExceptionMessage(Throwable th, Class<? extends Throwable> cls, String str) {
        Assert.assertTrue("Throwable was not of UserException type.", th instanceof UserException);
        UserBitShared.ExceptionWrapper exception = ((UserException) th).getOrCreatePBError(false).getException();
        Assert.assertEquals("Exception class names should match.", cls.getName(), exception.getExceptionClass());
        Assert.assertEquals("Exception sites should match.", str, exception.getMessage());
    }

    @Test
    public void settingNoOpInjectionsAndQuery() {
        long countAllocatedMemory = countAllocatedMemory();
        setControls(Controls.newBuilder().addExceptionOnBit(getClass(), "noop", RuntimeException.class, getEndpoint(DRILLBIT_BETA)).build());
        WaitUntilCompleteListener waitUntilCompleteListener = new WaitUntilCompleteListener();
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, TEST_QUERY, waitUntilCompleteListener);
        assertStateCompleted(waitUntilCompleteListener.waitForCompletion(), UserBitShared.QueryResult.QueryState.COMPLETED);
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    private static void testForeman(String str) {
        assertFailsWithException(Controls.newBuilder().addException(Foreman.class, str, ForemanException.class).build(), ForemanException.class, str);
    }

    @Test
    @RepeatTestRule.Repeat(count = 3)
    public void foreman_runTryBeginning() {
        long countAllocatedMemory = countAllocatedMemory();
        testForeman("run-try-beginning");
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    @Ignore
    public void foreman_runTryEnd() {
        long countAllocatedMemory = countAllocatedMemory();
        testForeman("run-try-end");
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    private static void assertStateCompleted(Pair<UserBitShared.QueryResult.QueryState, Exception> pair, UserBitShared.QueryResult.QueryState queryState) {
        UserBitShared.QueryResult.QueryState queryState2 = (UserBitShared.QueryResult.QueryState) pair.getFirst();
        Exception exc = (Exception) pair.getSecond();
        if (queryState2 == queryState && exc == null) {
            return;
        }
        Object[] objArr = new Object[3];
        objArr[0] = queryState;
        objArr[1] = queryState2;
        objArr[2] = exc == null ? "none." : exc;
        Assert.fail(String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s", objArr));
    }

    private static void assertCancelledWithoutException(String str, WaitUntilCompleteListener waitUntilCompleteListener, String str2) {
        setControls(str);
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, str2, waitUntilCompleteListener);
        assertStateCompleted(waitUntilCompleteListener.waitForCompletion(), UserBitShared.QueryResult.QueryState.CANCELED);
    }

    private static void assertCancelledWithoutException(String str, WaitUntilCompleteListener waitUntilCompleteListener) {
        assertCancelledWithoutException(str, waitUntilCompleteListener, TEST_QUERY);
    }

    @Test
    public void passThrough() {
        long countAllocatedMemory = countAllocatedMemory();
        WaitUntilCompleteListener waitUntilCompleteListener = new WaitUntilCompleteListener() { // from class: org.apache.drill.exec.server.TestDrillbitResilience.2
            @Override // org.apache.drill.exec.server.TestDrillbitResilience.WaitUntilCompleteListener
            public void queryIdArrived(UserBitShared.QueryId queryId) {
                super.queryIdArrived(queryId);
                ExtendedLatch extendedLatch = new ExtendedLatch(1);
                new ResumingThread(queryId, this.ex, extendedLatch).start();
                extendedLatch.countDown();
            }
        };
        setControls(Controls.newBuilder().addPause(PojoRecordReader.class, "read-next").build());
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, TEST_QUERY, waitUntilCompleteListener);
        assertStateCompleted(waitUntilCompleteListener.waitForCompletion(), UserBitShared.QueryResult.QueryState.COMPLETED);
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    @Ignore
    public void cancelWhenQueryIdArrives() {
        long countAllocatedMemory = countAllocatedMemory();
        assertCancelledWithoutException(Controls.newBuilder().addPause(FragmentExecutor.class, "fragment-running").build(), new WaitUntilCompleteListener() { // from class: org.apache.drill.exec.server.TestDrillbitResilience.3
            @Override // org.apache.drill.exec.server.TestDrillbitResilience.WaitUntilCompleteListener
            public void queryIdArrived(UserBitShared.QueryId queryId) {
                super.queryIdArrived(queryId);
                cancelAndResume();
            }
        });
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    @RepeatTestRule.Repeat(count = 3)
    public void cancelInMiddleOfFetchingResults() {
        long countAllocatedMemory = countAllocatedMemory();
        assertCancelledWithoutException(Controls.newBuilder().addPause(ScreenCreator.class, "sending-data", 1).build(), new WaitUntilCompleteListener() { // from class: org.apache.drill.exec.server.TestDrillbitResilience.4
            private boolean cancelRequested = false;

            @Override // org.apache.drill.exec.server.TestDrillbitResilience.WaitUntilCompleteListener
            public void dataArrived(QueryDataBatch queryDataBatch, ConnectionThrottle connectionThrottle) {
                if (!this.cancelRequested) {
                    check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                    cancelAndResume();
                    this.cancelRequested = true;
                }
                queryDataBatch.release();
            }
        });
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    @RepeatTestRule.Repeat(count = 3)
    public void cancelAfterAllResultsProduced() {
        long countAllocatedMemory = countAllocatedMemory();
        assertCancelledWithoutException(Controls.newBuilder().addPause(ScreenCreator.class, "send-complete").build(), new WaitUntilCompleteListener() { // from class: org.apache.drill.exec.server.TestDrillbitResilience.5
            private int count = 0;

            @Override // org.apache.drill.exec.server.TestDrillbitResilience.WaitUntilCompleteListener
            public void dataArrived(QueryDataBatch queryDataBatch, ConnectionThrottle connectionThrottle) {
                int i = this.count + 1;
                this.count = i;
                if (i == TestDrillbitResilience.drillbits.size()) {
                    check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                    cancelAndResume();
                }
                queryDataBatch.release();
            }
        });
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    @RepeatTestRule.Repeat(count = 3)
    public void cancelAfterEverythingIsCompleted() {
        long countAllocatedMemory = countAllocatedMemory();
        assertCancelledWithoutException(Controls.newBuilder().addPause(Foreman.class, "foreman-cleanup").build(), new WaitUntilCompleteListener() { // from class: org.apache.drill.exec.server.TestDrillbitResilience.6
            private int count = 0;

            @Override // org.apache.drill.exec.server.TestDrillbitResilience.WaitUntilCompleteListener
            public void dataArrived(QueryDataBatch queryDataBatch, ConnectionThrottle connectionThrottle) {
                int i = this.count + 1;
                this.count = i;
                if (i == TestDrillbitResilience.drillbits.size()) {
                    check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                    cancelAndResume();
                }
                queryDataBatch.release();
            }
        });
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    public void successfullyCompletes() {
        long countAllocatedMemory = countAllocatedMemory();
        WaitUntilCompleteListener waitUntilCompleteListener = new WaitUntilCompleteListener();
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, TEST_QUERY, waitUntilCompleteListener);
        assertStateCompleted(waitUntilCompleteListener.waitForCompletion(), UserBitShared.QueryResult.QueryState.COMPLETED);
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    private static void assertFailsWithException(String str, Class<? extends Throwable> cls, String str2, String str3) {
        setControls(str);
        WaitUntilCompleteListener waitUntilCompleteListener = new WaitUntilCompleteListener();
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, str3, waitUntilCompleteListener);
        Pair<UserBitShared.QueryResult.QueryState, Exception> waitForCompletion = waitUntilCompleteListener.waitForCompletion();
        UserBitShared.QueryResult.QueryState queryState = (UserBitShared.QueryResult.QueryState) waitForCompletion.getFirst();
        Assert.assertTrue(String.format("Query state should be FAILED (and not %s).", queryState), queryState == UserBitShared.QueryResult.QueryState.FAILED);
        assertExceptionMessage((Throwable) waitForCompletion.getSecond(), cls, str2);
    }

    private static void assertFailsWithException(String str, Class<? extends Throwable> cls, String str2) {
        assertFailsWithException(str, cls, str2, TEST_QUERY);
    }

    @Test
    public void failsWhenParsing() {
        long countAllocatedMemory = countAllocatedMemory();
        assertFailsWithException(Controls.newBuilder().addException(DrillSqlWorker.class, "sql-parsing", ForemanSetupException.class).build(), ForemanSetupException.class, "sql-parsing");
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    public void failsWhenSendingFragments() {
        long countAllocatedMemory = countAllocatedMemory();
        assertFailsWithException(Controls.newBuilder().addException(Foreman.class, "send-fragments", ForemanException.class).build(), ForemanException.class, "send-fragments");
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    public void failsDuringExecution() {
        long countAllocatedMemory = countAllocatedMemory();
        assertFailsWithException(Controls.newBuilder().addException(FragmentExecutor.class, "fragment-execution", IOException.class).build(), IOException.class, "fragment-execution");
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    @RepeatTestRule.Repeat(count = 3)
    public void interruptingBlockedMergingRecordBatch() {
        long countAllocatedMemory = countAllocatedMemory();
        interruptingBlockedFragmentsWaitingForData(Controls.newBuilder().addPause(MergingRecordBatch.class, "waiting-for-data", 1).build());
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    @RepeatTestRule.Repeat(count = 3)
    public void interruptingBlockedUnorderedReceiverBatch() {
        long countAllocatedMemory = countAllocatedMemory();
        interruptingBlockedFragmentsWaitingForData(Controls.newBuilder().addPause(UnorderedReceiverBatch.class, "waiting-for-data", 1).build());
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    private static void interruptingBlockedFragmentsWaitingForData(String str) {
        try {
            setSessionOption("planner.slice_target", "1");
            setSessionOption(PlannerSettings.HASHAGG.getOptionName(), "false");
            assertCancelledWithoutException(str, new ListenerThatCancelsQueryAfterFirstBatchOfData(), "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city");
            setSessionOption("planner.slice_target", Long.toString(100000L));
            setSessionOption(PlannerSettings.HASHAGG.getOptionName(), PlannerSettings.HASHAGG.getDefault().bool_val.toString());
        } catch (Throwable th) {
            setSessionOption("planner.slice_target", Long.toString(100000L));
            setSessionOption(PlannerSettings.HASHAGG.getOptionName(), PlannerSettings.HASHAGG.getDefault().bool_val.toString());
            throw th;
        }
    }

    @Test
    @RepeatTestRule.Repeat(count = 3)
    public void interruptingPartitionerThreadFragment() {
        try {
            setSessionOption("planner.slice_target", "1");
            setSessionOption(PlannerSettings.HASHAGG.getOptionName(), "true");
            setSessionOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
            long countAllocatedMemory = countAllocatedMemory();
            assertCancelledWithoutException(Controls.newBuilder().addLatch(PartitionerDecorator.class, "partitioner-sender-latch").addPause(PartitionerDecorator.class, "wait-for-fragment-interrupt", 1).build(), new ListenerThatCancelsQueryAfterFirstBatchOfData(), "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city");
            long countAllocatedMemory2 = countAllocatedMemory();
            Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
            setSessionOption("planner.slice_target", Long.toString(100000L));
            setSessionOption(PlannerSettings.HASHAGG.getOptionName(), PlannerSettings.HASHAGG.getDefault().bool_val.toString());
            setSessionOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName(), Long.toString(PlannerSettings.PARTITION_SENDER_SET_THREADS.getDefault().num_val.longValue()));
        } catch (Throwable th) {
            setSessionOption("planner.slice_target", Long.toString(100000L));
            setSessionOption(PlannerSettings.HASHAGG.getOptionName(), PlannerSettings.HASHAGG.getDefault().bool_val.toString());
            setSessionOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName(), Long.toString(PlannerSettings.PARTITION_SENDER_SET_THREADS.getDefault().num_val.longValue()));
            throw th;
        }
    }

    @Test
    @Ignore
    public void interruptingWhileFragmentIsBlockedInAcquiringSendingTicket() {
        long countAllocatedMemory = countAllocatedMemory();
        assertCancelledWithoutException(Controls.newBuilder().addPause(SingleSenderCreator.SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1).build(), new ListenerThatCancelsQueryAfterFirstBatchOfData());
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    @RepeatTestRule.Repeat(count = 3)
    public void memoryLeaksWhenCancelled() {
        setSessionOption("planner.slice_target", "10");
        long countAllocatedMemory = countAllocatedMemory();
        try {
            String build = Controls.newBuilder().addPause(ScreenCreator.class, "sending-data", 1).build();
            String str = null;
            try {
                String file = BaseTestQuery.getFile("queries/tpch/09.sql");
                str = file.substring(0, file.length() - 1);
            } catch (IOException e) {
                Assert.fail("Failed to get query file: " + e);
            }
            assertCancelledWithoutException(build, new WaitUntilCompleteListener() { // from class: org.apache.drill.exec.server.TestDrillbitResilience.7
                private volatile boolean cancelRequested = false;

                @Override // org.apache.drill.exec.server.TestDrillbitResilience.WaitUntilCompleteListener
                public void dataArrived(QueryDataBatch queryDataBatch, ConnectionThrottle connectionThrottle) {
                    if (!this.cancelRequested) {
                        check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                        cancelAndResume();
                        this.cancelRequested = true;
                    }
                    queryDataBatch.release();
                }
            }, str);
            long countAllocatedMemory2 = countAllocatedMemory();
            Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
            setSessionOption("planner.slice_target", Long.toString(100000L));
        } catch (Throwable th) {
            setSessionOption("planner.slice_target", Long.toString(100000L));
            throw th;
        }
    }

    @Test
    @Ignore
    public void memoryLeaksWhenFailed() {
        setSessionOption("planner.slice_target", "10");
        long countAllocatedMemory = countAllocatedMemory();
        try {
            String build = Controls.newBuilder().addException(FragmentExecutor.class, "fragment-execution", IOException.class).build();
            String str = null;
            try {
                String file = BaseTestQuery.getFile("queries/tpch/09.sql");
                str = file.substring(0, file.length() - 1);
            } catch (IOException e) {
                Assert.fail("Failed to get query file: " + e);
            }
            assertFailsWithException(build, IOException.class, "fragment-execution", str);
            long countAllocatedMemory2 = countAllocatedMemory();
            Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
            setSessionOption("planner.slice_target", Long.toString(100000L));
        } catch (Throwable th) {
            setSessionOption("planner.slice_target", Long.toString(100000L));
            throw th;
        }
    }

    @Test
    public void failsAfterMSorterSorting() {
        long countAllocatedMemory = countAllocatedMemory();
        assertFailsWithException(Controls.newBuilder().addException(ExternalSortBatch.class, "after-sort", RuntimeException.class).build(), RuntimeException.class, "after-sort", "select n_name from cp.`tpch/nation.parquet` order by n_name");
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    @Test
    public void failsAfterMSorterSetup() {
        long countAllocatedMemory = countAllocatedMemory();
        assertFailsWithException(Controls.newBuilder().addException(ExternalSortBatch.class, "after-setup", RuntimeException.class).build(), RuntimeException.class, "after-setup", "select n_name from cp.`tpch/nation.parquet` order by n_name");
        long countAllocatedMemory2 = countAllocatedMemory();
        Assert.assertEquals(String.format("We are leaking %d bytes", Long.valueOf(countAllocatedMemory2 - countAllocatedMemory)), countAllocatedMemory, countAllocatedMemory2);
    }

    private static long countAllocatedMemory() {
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
        }
        long j = 0;
        Iterator<String> it = drillbits.keySet().iterator();
        while (it.hasNext()) {
            j += drillbits.get(it.next()).getContext().getAllocator().getAllocatedMemory();
        }
        return j;
    }
}
