package org.apache.streams.local.tasks;

import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.local.test.providers.NumericMessageProvider;
import org.apache.streams.local.test.writer.DatumCounterWriter;
import org.apache.streams.util.ComponentUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/streams/local/tasks/BasicTasksTest.class */
public class BasicTasksTest {
    private static final String MBEAN_ID = "test_bean";

    @After
    public void removeLocalMBeans() {
        try {
            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
        } catch (Exception e) {
        }
    }

    @Test
    public void testProviderTask() {
        Runnable streamsProviderTask = new StreamsProviderTask(new NumericMessageProvider(100), false, (StreamsConfiguration) null);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        streamsProviderTask.addOutputQueue(linkedBlockingQueue);
        UnsupportedOperationException unsupportedOperationException = null;
        try {
            streamsProviderTask.addInputQueue(createInputQueue(100));
        } catch (UnsupportedOperationException e) {
            unsupportedOperationException = e;
        }
        Assert.assertNotNull(unsupportedOperationException);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        newFixedThreadPool.submit(streamsProviderTask);
        while (linkedBlockingQueue.size() != 100) {
            Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
            if (0 == 10) {
                Assert.fail("Provider task failed to output " + 100 + " in a timely fashion.");
            }
        }
        newFixedThreadPool.shutdown();
        try {
            if (!newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS)) {
                newFixedThreadPool.shutdownNow();
                Assert.fail("Service did not terminate.");
            }
            Assert.assertTrue("Task should have completed running in allotted time.", newFixedThreadPool.isTerminated());
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testProcessorTask() {
        Runnable streamsProcessorTask = new StreamsProcessorTask(new PassthroughDatumCounterProcessor(""));
        StreamsTaskCounter streamsTaskCounter = new StreamsTaskCounter(MBEAN_ID, (String) null, -1L);
        streamsProcessorTask.setStreamsTaskCounter(streamsTaskCounter);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        BlockingQueue<StreamsDatum> createInputQueue = createInputQueue(100);
        streamsProcessorTask.addOutputQueue(linkedBlockingQueue);
        streamsProcessorTask.addInputQueue(createInputQueue);
        Assert.assertEquals(100, ((BlockingQueue) streamsProcessorTask.getInputQueues().get(0)).size());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        newFixedThreadPool.submit(streamsProcessorTask);
        int i = 0;
        while (createInputQueue.size() != 0 && linkedBlockingQueue.size() != 100) {
            Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
            i++;
            if (i == 10) {
                Assert.fail("Processor task failed to output " + 100 + " in a timely fashion.");
            }
        }
        streamsProcessorTask.stopTask();
        newFixedThreadPool.shutdown();
        try {
            if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                newFixedThreadPool.shutdownNow();
                Assert.fail("Service did not terminate.");
            }
            Assert.assertTrue("Task should have completed running in allotted time.", newFixedThreadPool.isTerminated());
        } catch (InterruptedException e) {
            Assert.fail("Test Interrupted.");
        }
        Assert.assertEquals(100, r0.getMessageCount());
        Assert.assertEquals(100, streamsTaskCounter.getNumReceived());
        Assert.assertEquals(100, streamsTaskCounter.getNumEmitted());
        Assert.assertEquals(0L, streamsTaskCounter.getNumUnhandledErrors());
        Assert.assertEquals(0.0d, streamsTaskCounter.getErrorRate(), 0.0d);
    }

    @Test
    public void testWriterTask() {
        Runnable streamsPersistWriterTask = new StreamsPersistWriterTask(new DatumCounterWriter(""));
        StreamsTaskCounter streamsTaskCounter = new StreamsTaskCounter(MBEAN_ID, (String) null, -1L);
        streamsPersistWriterTask.setStreamsTaskCounter(streamsTaskCounter);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        BlockingQueue<StreamsDatum> createInputQueue = createInputQueue(100);
        UnsupportedOperationException unsupportedOperationException = null;
        try {
            streamsPersistWriterTask.addOutputQueue(linkedBlockingQueue);
        } catch (UnsupportedOperationException e) {
            unsupportedOperationException = e;
        }
        Assert.assertNotNull(unsupportedOperationException);
        streamsPersistWriterTask.addInputQueue(createInputQueue);
        Assert.assertEquals(100, ((BlockingQueue) streamsPersistWriterTask.getInputQueues().get(0)).size());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        newFixedThreadPool.submit(streamsPersistWriterTask);
        int i = 0;
        while (createInputQueue.size() != 0) {
            Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
            i++;
            if (i == 10) {
                Assert.fail("Processor task failed to output " + 100 + " in a timely fashion.");
            }
        }
        streamsPersistWriterTask.stopTask();
        newFixedThreadPool.shutdown();
        try {
            if (!newFixedThreadPool.awaitTermination(15L, TimeUnit.SECONDS)) {
                newFixedThreadPool.shutdownNow();
                Assert.fail("Service did not terminate.");
            }
            Assert.assertTrue("Task should have completed running in allotted time.", newFixedThreadPool.isTerminated());
        } catch (InterruptedException e2) {
            Assert.fail("Test Interrupted.");
        }
        Assert.assertEquals(100, r0.getDatumsCounted());
        Assert.assertEquals(100, streamsTaskCounter.getNumReceived());
        Assert.assertEquals(0L, streamsTaskCounter.getNumEmitted());
        Assert.assertEquals(0L, streamsTaskCounter.getNumUnhandledErrors());
        Assert.assertEquals(0.0d, streamsTaskCounter.getErrorRate(), 0.0d);
    }

    @Test
    public void testMergeTask() {
        Runnable streamsMergeTask = new StreamsMergeTask();
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        streamsMergeTask.addOutputQueue(linkedBlockingQueue);
        for (int i = 0; i < 5; i++) {
            streamsMergeTask.addInputQueue(createInputQueue(100));
        }
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        newFixedThreadPool.submit(streamsMergeTask);
        int i2 = 0;
        while (linkedBlockingQueue.size() != 5 * 100) {
            Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
            i2++;
            if (i2 == 10) {
                Assert.assertEquals("Processor task failed to output " + (100 * 5) + " in a timely fashion.", 100 * 5, linkedBlockingQueue.size());
            }
        }
        streamsMergeTask.stopTask();
        newFixedThreadPool.shutdown();
        try {
            if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                newFixedThreadPool.shutdownNow();
                Assert.fail("Service did not terminate.");
            }
            Assert.assertTrue("Task should have completed running in allotted time.", newFixedThreadPool.isTerminated());
        } catch (InterruptedException e) {
            Assert.fail("Test Interrupted.");
        }
    }

    @Test
    public void testBranching() {
        Runnable streamsProcessorTask = new StreamsProcessorTask(new PassthroughDatumCounterProcessor(""));
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        BlockingQueue<StreamsDatum> createInputQueue = createInputQueue(100);
        streamsProcessorTask.addOutputQueue(linkedBlockingQueue);
        streamsProcessorTask.addOutputQueue(linkedBlockingQueue2);
        streamsProcessorTask.addInputQueue(createInputQueue);
        Assert.assertEquals(100, ((BlockingQueue) streamsProcessorTask.getInputQueues().get(0)).size());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        newFixedThreadPool.submit(streamsProcessorTask);
        int i = 0;
        while (createInputQueue.size() != 0) {
            Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
            i++;
            if (i == 10) {
                Assert.assertEquals("Processor task failed to output " + 100 + " in a timely fashion.", 0L, createInputQueue.size());
            }
        }
        streamsProcessorTask.stopTask();
        newFixedThreadPool.shutdown();
        try {
            if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                newFixedThreadPool.shutdownNow();
                Assert.fail("Service did not terminate.");
            }
            Assert.assertTrue("Task should have completed running in allotted time.", newFixedThreadPool.isTerminated());
        } catch (InterruptedException e) {
            Assert.fail("Test Interrupted.");
        }
        Assert.assertEquals(100, r0.getMessageCount());
        Assert.assertEquals(100, linkedBlockingQueue.size());
        Assert.assertEquals(100, linkedBlockingQueue2.size());
    }

    @Test
    public void testBranchingSerialization() {
        Runnable streamsProcessorTask = new StreamsProcessorTask(new PassthroughDatumCounterProcessor(""));
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        BlockingQueue<StreamsDatum> createInputQueue = createInputQueue(1);
        streamsProcessorTask.addOutputQueue(linkedBlockingQueue);
        streamsProcessorTask.addOutputQueue(linkedBlockingQueue2);
        streamsProcessorTask.addInputQueue(createInputQueue);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        newFixedThreadPool.submit(streamsProcessorTask);
        int i = 0;
        while (createInputQueue.size() != 0) {
            Uninterruptibles.sleepUninterruptibly(500L, TimeUnit.MILLISECONDS);
            i++;
            if (i == 10) {
                Assert.assertEquals("Processor task failed to output " + 1 + " in a timely fashion.", 0L, createInputQueue.size());
            }
        }
        streamsProcessorTask.stopTask();
        newFixedThreadPool.shutdown();
        try {
            if (!newFixedThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                newFixedThreadPool.shutdownNow();
                Assert.fail("Service did not terminate.");
            }
            Assert.assertTrue("Task should have completed running in allotted time.", newFixedThreadPool.isTerminated());
        } catch (InterruptedException e) {
            Assert.fail("Test Interrupted.");
        }
        Assert.assertEquals(1, r0.getMessageCount());
        Assert.assertEquals(1, linkedBlockingQueue.size());
        Assert.assertEquals(1, linkedBlockingQueue2.size());
        StreamsDatum streamsDatum = (StreamsDatum) linkedBlockingQueue.poll();
        StreamsDatum streamsDatum2 = (StreamsDatum) linkedBlockingQueue2.poll();
        Assert.assertNotNull(streamsDatum);
        Assert.assertEquals(streamsDatum, streamsDatum2);
        streamsDatum.setDocument("a");
        Assert.assertNotEquals(streamsDatum, streamsDatum2);
    }

    private BlockingQueue<StreamsDatum> createInputQueue(int i) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        for (int i2 = 0; i2 < i; i2++) {
            linkedBlockingQueue.add(new StreamsDatum(Integer.valueOf(i2)));
        }
        return linkedBlockingQueue;
    }
}
