package com.datatorrent.lib.appdata.query;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.appdata.query.serde.MessageSerializerFactory;
import com.datatorrent.lib.appdata.schemas.ResultFormatter;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import java.util.Random;
import org.apache.commons.lang3.mutable.MutableLong;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest.class */
public class QueryManagerAsynchronousTest {

    @Rule
    public TestWatcher testMeta = new InterruptClear();
    private static final Logger LOG = LoggerFactory.getLogger(QueryManagerAsynchronousTest.class);

    /* loaded from: input_file:com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest$InterruptClear.class */
    public static class InterruptClear extends TestWatcher {
        protected void starting(Description description) {
            Thread.interrupted();
        }

        protected void finished(Description description) {
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
            }
            Thread.interrupted();
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest$NOPQueryExecutor.class */
    public static class NOPQueryExecutor implements QueryExecutor<MockQuery, Void, MutableLong, MockResult> {
        private final double waitMillisProb;
        private final Random rand = new Random();

        public NOPQueryExecutor(double d) {
            this.waitMillisProb = d;
        }

        public MockResult executeQuery(MockQuery mockQuery, Void r7, MutableLong mutableLong) {
            if (this.rand.nextDouble() < this.waitMillisProb) {
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            return new MockResult(mockQuery);
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/appdata/query/QueryManagerAsynchronousTest$ProducerThread.class */
    public static class ProducerThread implements Runnable {
        private final int totalTuples;
        private final int batchSize;
        private final AppDataWindowEndQueueManager<MockQuery, Void> queueManager;
        private final double waitMillisProb;
        private final Random rand = new Random();

        public ProducerThread(AppDataWindowEndQueueManager<MockQuery, Void> appDataWindowEndQueueManager, int i, int i2, double d) {
            this.queueManager = appDataWindowEndQueueManager;
            this.totalTuples = i;
            this.batchSize = i2;
            this.waitMillisProb = d;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = this.totalTuples / this.batchSize;
            int i2 = 0;
            int i3 = 0;
            while (i2 < i) {
                int i4 = 0;
                while (i4 < this.batchSize) {
                    this.queueManager.enqueue(new MockQuery(i3 + ""), (Object) null, new MutableLong(1L));
                    if (this.rand.nextDouble() < this.waitMillisProb) {
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    i4++;
                    i3++;
                }
                i2++;
                i3++;
            }
        }
    }

    @Test
    public void stressTest() throws Exception {
        AppDataWindowEndQueueManager appDataWindowEndQueueManager = new AppDataWindowEndQueueManager();
        DefaultOutputPort defaultOutputPort = new DefaultOutputPort();
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        TestUtils.setSink(defaultOutputPort, collectorTestSink);
        QueryManagerAsynchronous queryManagerAsynchronous = new QueryManagerAsynchronous(defaultOutputPort, appDataWindowEndQueueManager, new NOPQueryExecutor(0.01d), new MessageSerializerFactory(new ResultFormatter()), Thread.currentThread());
        Thread thread = new Thread(new ProducerThread(appDataWindowEndQueueManager, 100000, 100, 0.01d));
        thread.start();
        thread.setName("Producer Thread");
        long currentTimeMillis = System.currentTimeMillis();
        queryManagerAsynchronous.setup((Context.OperatorContext) null);
        int i = 0;
        while (collectorTestSink.collectedTuples.size() < 100000 && System.currentTimeMillis() - currentTimeMillis < 60000) {
            queryManagerAsynchronous.beginWindow(i);
            Thread.sleep(100L);
            queryManagerAsynchronous.endWindow();
            i++;
        }
        thread.stop();
        queryManagerAsynchronous.teardown();
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
        Assert.assertEquals(100000L, collectorTestSink.collectedTuples.size());
    }
}
