package com.datatorrent.lib.appdata.query;

import com.datatorrent.api.Context;
import com.datatorrent.lib.appdata.ThreadUtils;
import com.datatorrent.lib.appdata.schemas.Query;
import com.google.common.base.Preconditions;
import java.lang.Thread;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/appdata/query/SimpleDoneQueryQueueManagerTest.class */
public class SimpleDoneQueryQueueManagerTest {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleDoneQueryQueueManagerTest.class);

    /* loaded from: input_file:com/datatorrent/lib/appdata/query/SimpleDoneQueryQueueManagerTest$BlockedThread.class */
    public static class BlockedThread<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> implements Runnable {
        QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager;

        public BlockedThread(QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager) {
            setQueueManager(queueManager);
        }

        private void setQueueManager(QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager) {
            this.queueManager = (QueueManager) Preconditions.checkNotNull(queueManager);
        }

        @Override // java.lang.Runnable
        public void run() {
            SimpleDoneQueryQueueManagerTest.LOG.debug("{}", this.queueManager.dequeueBlock());
        }
    }

    @Test
    public void firstDoneTest() {
        SimpleDoneQueueManager simpleDoneQueueManager = new SimpleDoneQueueManager();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        simpleDoneQueueManager.enqueue(new MockQuery("1"), (Object) null, new MutableBoolean(true));
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void simpleEnqueueDequeueBlock() {
        SimpleDoneQueueManager simpleDoneQueueManager = new SimpleDoneQueueManager();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        MockQuery mockQuery = new MockQuery("1");
        simpleDoneQueueManager.enqueue(mockQuery, (Object) null, new MutableBoolean(false));
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        QueryBundle dequeueBlock = simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(0L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals("Should return same query.", mockQuery, dequeueBlock.getQuery());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void simpleBlockingTest() throws Exception {
        SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager = new SimpleDoneQueueManager<>();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        testBlocking(simpleDoneQueueManager);
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void simpleEnqueueDequeue() {
        SimpleDoneQueueManager simpleDoneQueueManager = new SimpleDoneQueueManager();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        MockQuery mockQuery = new MockQuery("1");
        simpleDoneQueueManager.enqueue(mockQuery, (Object) null, new MutableBoolean(false));
        Assert.assertEquals("Should return same query.", mockQuery, simpleDoneQueueManager.dequeue().getQuery());
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        Assert.assertEquals("Should return same query.", mockQuery, simpleDoneQueueManager.dequeue().getQuery());
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void simpleEnqueueDequeueThenBlock() throws Exception {
        SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager = new SimpleDoneQueueManager<>();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        simpleDoneQueueManager.enqueue(new MockQuery("1"), (Object) null, new MutableBoolean(false));
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(0L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        testBlocking(simpleDoneQueueManager);
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void simpleExpire1() {
        SimpleDoneQueueManager simpleDoneQueueManager = new SimpleDoneQueueManager();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        MockQuery mockQuery = new MockQuery("1");
        simpleDoneQueueManager.enqueue(mockQuery, (Object) null, new MutableBoolean(false));
        QueryBundle dequeue = simpleDoneQueueManager.dequeue();
        Assert.assertEquals("Should return same query.", mockQuery, dequeue.getQuery());
        ((MutableBoolean) dequeue.getQueueContext()).setValue(true);
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void resetPermitsTest() throws Exception {
        SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager = new SimpleDoneQueueManager<>();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        simpleDoneQueueManager.enqueue(new MockQuery("1"), (Object) null, new MutableBoolean(false));
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(0L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(0L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        testBlocking(simpleDoneQueueManager);
        simpleDoneQueueManager.endWindow();
    }

    @Test
    public void expiredTestBlocking() throws Exception {
        SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager = new SimpleDoneQueueManager<>();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        MockQuery mockQuery = new MockQuery("1");
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        simpleDoneQueueManager.enqueue(mockQuery, (Object) null, mutableBoolean);
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(0L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        mutableBoolean.setValue(true);
        testBlocking(simpleDoneQueueManager);
        Assert.assertEquals(0L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void expiredTestBlockingExpiredFirstValidLast() throws Exception {
        SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager = new SimpleDoneQueueManager<>();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        MockQuery mockQuery = new MockQuery("1");
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        simpleDoneQueueManager.enqueue(mockQuery, (Object) null, mutableBoolean);
        simpleDoneQueueManager.enqueue(new MockQuery("2"), (Object) null, new MutableBoolean(false));
        Assert.assertEquals(2L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        Assert.assertEquals(2L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        mutableBoolean.setValue(true);
        simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(0L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        testBlocking(simpleDoneQueueManager);
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(2L);
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.dequeueBlock();
        testBlocking(simpleDoneQueueManager);
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void expiredTestBlockingValidFirstExpiredLast() throws Exception {
        SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager = new SimpleDoneQueueManager<>();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        simpleDoneQueueManager.enqueue(new MockQuery("1"), (Object) null, new MutableBoolean(false));
        MockQuery mockQuery = new MockQuery("2");
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        simpleDoneQueueManager.enqueue(mockQuery, (Object) null, mutableBoolean);
        Assert.assertEquals(2L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        Assert.assertEquals(2L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        mutableBoolean.setValue(true);
        simpleDoneQueueManager.dequeueBlock();
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        testBlocking(simpleDoneQueueManager);
        Assert.assertEquals(0L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(2L);
        Assert.assertEquals(1L, simpleDoneQueueManager.getNumLeft());
        Assert.assertEquals(simpleDoneQueueManager.getNumPermits(), simpleDoneQueueManager.getNumLeft());
        simpleDoneQueueManager.dequeueBlock();
        testBlocking(simpleDoneQueueManager);
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void simpleExpire1ThenBlock() {
        SimpleDoneQueueManager simpleDoneQueueManager = new SimpleDoneQueueManager();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        MockQuery mockQuery = new MockQuery("1");
        simpleDoneQueueManager.enqueue(mockQuery, (Object) null, new MutableBoolean(false));
        QueryBundle dequeue = simpleDoneQueueManager.dequeue();
        Assert.assertEquals("Should return same query.", mockQuery, dequeue.getQuery());
        ((MutableBoolean) dequeue.getQueueContext()).setValue(true);
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        Assert.assertEquals("Should return back null.", (Object) null, simpleDoneQueueManager.dequeue());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    @Test
    public void simpleExpireBlockThenUnblock() throws Exception {
        SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager = new SimpleDoneQueueManager<>();
        simpleDoneQueueManager.setup((Context.OperatorContext) null);
        simpleDoneQueueManager.beginWindow(0L);
        MockQuery mockQuery = new MockQuery("1");
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        simpleDoneQueueManager.enqueue(mockQuery, (Object) null, mutableBoolean);
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.beginWindow(1L);
        mutableBoolean.setValue(true);
        ThreadUtils.ExceptionSaverExceptionHandler exceptionSaverExceptionHandler = new ThreadUtils.ExceptionSaverExceptionHandler();
        testBlockingNoStop(simpleDoneQueueManager, exceptionSaverExceptionHandler);
        simpleDoneQueueManager.enqueue(new MockQuery("2"), (Object) null, new MutableBoolean(false));
        Thread.sleep(1000L);
        Assert.assertNull(exceptionSaverExceptionHandler.getCaughtThrowable());
        simpleDoneQueueManager.endWindow();
        simpleDoneQueueManager.teardown();
    }

    private void testBlocking(SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager) throws InterruptedException {
        Thread thread = new Thread(new BlockedThread(simpleDoneQueueManager));
        thread.start();
        Thread.sleep(100L);
        Assert.assertEquals(Thread.State.WAITING, thread.getState());
        thread.stop();
    }

    private Thread testBlockingNoStop(SimpleDoneQueueManager<Query, Void> simpleDoneQueueManager, ThreadUtils.ExceptionSaverExceptionHandler exceptionSaverExceptionHandler) throws InterruptedException {
        Thread thread = new Thread(new BlockedThread(simpleDoneQueueManager));
        thread.setUncaughtExceptionHandler(exceptionSaverExceptionHandler);
        thread.start();
        Thread.sleep(100L);
        Assert.assertEquals(Thread.State.WAITING, thread.getState());
        return thread;
    }
}
