package org.apache.streams.local.tasks;

import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.util.ComponentUtils;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/streams/local/tasks/StreamsProviderTaskTest.class */
public class StreamsProviderTaskTest {
    protected StreamsProvider mockProvider;
    protected ExecutorService pool;

    @Before
    public void setup() {
        this.mockProvider = (StreamsProvider) Mockito.mock(StreamsProvider.class);
        this.pool = Executors.newFixedThreadPool(1);
    }

    @After
    public void removeLocalMBeans() {
        try {
            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
        } catch (Exception e) {
        }
    }

    @Test
    public void runPerpetual() {
        StreamsProviderTask streamsProviderTask = new StreamsProviderTask(this.mockProvider, true, (StreamsConfiguration) null);
        Mockito.when(Boolean.valueOf(this.mockProvider.isRunning())).thenReturn(true);
        Mockito.when(this.mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue()));
        streamsProviderTask.setTimeout(500);
        streamsProviderTask.setSleepTime(10L);
        streamsProviderTask.run();
        ((StreamsProvider) Mockito.verify(this.mockProvider, Mockito.atLeast(2))).readCurrent();
        ((StreamsProvider) Mockito.verify(this.mockProvider, Mockito.atMost(1))).prepare((Object) null);
    }

    @Test
    public void flushes() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        StreamsProviderTask streamsProviderTask = new StreamsProviderTask(this.mockProvider, true, (StreamsConfiguration) null);
        Mockito.when(Boolean.valueOf(this.mockProvider.isRunning())).thenReturn(true);
        Mockito.when(this.mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
        streamsProviderTask.setTimeout(100);
        streamsProviderTask.setSleepTime(10L);
        streamsProviderTask.getOutputQueues().add(linkedBlockingQueue);
        streamsProviderTask.run();
        Assert.assertThat(Integer.valueOf(linkedBlockingQueue.size()), CoreMatchers.is(CoreMatchers.equalTo(3)));
    }

    protected Queue<StreamsDatum> getQueue(int i) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        for (int i2 = 0; i2 < i; i2++) {
            linkedBlockingQueue.add(new StreamsDatum(Double.valueOf(Math.random())));
        }
        return linkedBlockingQueue;
    }

    @Test
    public void runNonPerpetual() {
        StreamsProviderTask streamsProviderTask = new StreamsProviderTask(this.mockProvider, false, (StreamsConfiguration) null);
        Mockito.when(Boolean.valueOf(this.mockProvider.isRunning())).thenReturn(true);
        Mockito.when(this.mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue()));
        streamsProviderTask.setTimeout(500);
        streamsProviderTask.setSleepTime(10L);
        streamsProviderTask.run();
        ((StreamsProvider) Mockito.verify(this.mockProvider, Mockito.atLeast(1))).readCurrent();
        ((StreamsProvider) Mockito.verify(this.mockProvider, Mockito.atMost(1))).prepare((Object) null);
    }

    @Test
    public void stoppable() throws InterruptedException {
        Runnable streamsProviderTask = new StreamsProviderTask(this.mockProvider, true, (StreamsConfiguration) null);
        Mockito.when(Boolean.valueOf(this.mockProvider.isRunning())).thenReturn(true);
        Mockito.when(this.mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue()));
        streamsProviderTask.setTimeout(-1);
        streamsProviderTask.setSleepTime(10L);
        Future<?> submit = this.pool.submit(streamsProviderTask);
        int i = 0;
        do {
            Thread.sleep(100L);
            if (i == 0) {
                streamsProviderTask.stopTask();
            }
            i++;
            if (i >= 10) {
                break;
            }
        } while (!submit.isDone());
        verifyNotRunning(streamsProviderTask, submit);
    }

    @Test
    public void earlyException() throws InterruptedException {
        Runnable streamsProviderTask = new StreamsProviderTask(this.mockProvider, true, (StreamsConfiguration) null);
        Mockito.when(Boolean.valueOf(this.mockProvider.isRunning())).thenReturn(true);
        ((StreamsProvider) Mockito.doThrow(new RuntimeException()).when(this.mockProvider)).prepare((Object) null);
        streamsProviderTask.setTimeout(-1);
        streamsProviderTask.setSleepTime(10L);
        Future<?> submit = this.pool.submit(streamsProviderTask);
        int i = 0;
        while (true) {
            i++;
            if (i >= 10 || submit.isDone()) {
                break;
            } else {
                Thread.sleep(100L);
            }
        }
        verifyNotRunning(streamsProviderTask, submit);
    }

    protected void verifyNotRunning(StreamsProviderTask streamsProviderTask, Future<?> future) {
        if (future.isDone()) {
            Assert.assertThat(Boolean.valueOf(streamsProviderTask.isRunning()), CoreMatchers.is(false));
        } else {
            ComponentUtils.shutdownExecutor(this.pool, 0, 10);
            Assert.fail();
        }
    }
}
