/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.di.trans.streaming.common;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.trans.streaming.common.BaseStreamStep;
import org.pentaho.di.trans.streaming.common.BlockingQueueStreamSource;

@RunWith(value=MockitoJUnitRunner.class)
public class BlockingQueueStreamSourceTest {
    private ExecutorService execSvc = Executors.newCachedThreadPool();
    @Mock
    private BaseStreamStep streamStep;
    @Mock
    private Semaphore semaphore;
    @Mock
    private LogChannel logChannel;
    private BlockingQueueStreamSource<String> streamSource;

    @Before
    public void before() {
        this.streamSource = new BlockingQueueStreamSource<String>(this.streamStep){

            public void open() {
            }
        };
    }

    @Test
    public void errorLoggedIfInterruptedInAcceptRows() throws InterruptedException {
        this.streamSource.acceptingRowsSemaphore = this.semaphore;
        this.streamSource.logChannel = this.logChannel;
        ((Semaphore)Mockito.doThrow((Throwable)new InterruptedException("interrupt")).when((Object)this.semaphore)).acquire();
        this.streamSource.acceptRows(Collections.singletonList("new row"));
        ((LogChannel)Mockito.verify((Object)this.logChannel)).logError((String)Matchers.any());
        ((Semaphore)Mockito.verify((Object)this.semaphore)).release();
    }

    @Test
    public void errorLoggedIfInterruptedInPause() throws InterruptedException {
        this.streamSource.acceptingRowsSemaphore = this.semaphore;
        Mockito.when((Object)this.semaphore.availablePermits()).thenReturn((Object)1);
        this.streamSource.logChannel = this.logChannel;
        ((Semaphore)Mockito.doThrow((Throwable)new InterruptedException("interrupt")).when((Object)this.semaphore)).acquire();
        this.streamSource.pause();
        ((LogChannel)Mockito.verify((Object)this.logChannel)).logError((String)Matchers.any());
    }

    @Test
    public void rowIterableBlocksTillRowReceived() throws Exception {
        this.streamSource.open();
        Iterator iterator = this.streamSource.flowable().blockingIterable().iterator();
        Future<Boolean> hasNext = this.execSvc.submit(iterator::hasNext);
        this.assertTimesOut(hasNext);
        this.streamSource.acceptRows(Collections.singletonList("New Row"));
        MatcherAssert.assertThat((Object)this.getQuickly(hasNext), (Matcher)CoreMatchers.equalTo((Object)true));
    }

    @Test
    public void streamIsPausable() throws InterruptedException, ExecutionException, TimeoutException {
        this.streamSource.open();
        Iterator iter = this.streamSource.flowable().blockingIterable().iterator();
        Future<String> nextString = this.execSvc.submit(iter::next);
        this.streamSource.acceptRows(Collections.singletonList("row"));
        MatcherAssert.assertThat((Object)this.getQuickly(nextString), (Matcher)CoreMatchers.equalTo((Object)"row"));
        this.streamSource.pause();
        Future<?> newRow = this.execSvc.submit(() -> this.streamSource.acceptRows(Collections.singletonList("new row")));
        this.assertTimesOut(newRow);
        this.streamSource.resume();
        nextString = this.execSvc.submit(iter::next);
        MatcherAssert.assertThat((Object)this.getQuickly(nextString), (Matcher)CoreMatchers.equalTo((Object)"new row"));
    }

    @Test
    public void testRowsFilled() throws ExecutionException, InterruptedException {
        this.streamSource = new BlockingQueueStreamSource<String>(this.streamStep){

            public void open() {
                BlockingQueueStreamSourceTest.this.execSvc.submit(() -> {
                    for (int i = 0; i < 4; ++i) {
                        this.acceptRows(Collections.singletonList("new row " + i));
                        try {
                            Thread.sleep(5L);
                            continue;
                        }
                        catch (InterruptedException e) {
                            TestCase.fail();
                        }
                    }
                });
            }
        };
        this.streamSource.open();
        Iterator iterator = this.streamSource.flowable().blockingIterable().iterator();
        Future<List> iterLoop = this.execSvc.submit(() -> {
            ArrayList strings = new ArrayList();
            do {
                strings.add(iterator.next());
            } while (strings.size() < 4);
            return strings;
        });
        List quickly = this.getQuickly(iterLoop);
        MatcherAssert.assertThat((Object)quickly.size(), (Matcher)CoreMatchers.equalTo((Object)4));
    }

    @Test
    public void bufferSizeLimitedToOneThousand() {
        this.streamSource = new BlockingQueueStreamSource<String>(this.streamStep){

            public void open() {
                for (int i = 0; i < 1002; ++i) {
                    this.acceptRows(Collections.singletonList("new row " + i));
                }
            }
        };
        this.streamSource.open();
        Iterator iterator = this.streamSource.flowable().blockingIterable().iterator();
        ArrayList strings = new ArrayList();
        do {
            strings.add(iterator.next());
        } while (strings.size() < 1000);
        MatcherAssert.assertThat((Object)strings.size(), (Matcher)CoreMatchers.equalTo((Object)1000));
        Future<String> submit = this.execSvc.submit(iterator::next);
        try {
            submit.get(10L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException e) {
            TestCase.fail();
        }
        catch (TimeoutException e) {
            return;
        }
        TestCase.fail((String)"expected timeout");
    }

    @Test
    public void testError() {
        block2: {
            String exceptionMessage = "Exception raised during acceptRows loop";
            this.streamSource = new BlockingQueueStreamSource<String>(this.streamStep){

                public void open() {
                    BlockingQueueStreamSourceTest.this.execSvc.submit(() -> {
                        for (int i = 0; i < 10; ++i) {
                            this.acceptRows(Collections.singletonList("new row " + i));
                            try {
                                Thread.sleep(5L);
                            }
                            catch (InterruptedException e) {
                                TestCase.fail();
                            }
                            if (i != 5) continue;
                            this.error(new RuntimeException("Exception raised during acceptRows loop"));
                            break;
                        }
                    });
                }
            };
            this.streamSource.open();
            Iterator iterator = this.streamSource.flowable().blockingIterable().iterator();
            Future<List> iterLoop = this.execSvc.submit(() -> {
                ArrayList strings = new ArrayList();
                do {
                    strings.add(iterator.next());
                } while (strings.size() < 9);
                return strings;
            });
            try {
                iterLoop.get(50L, TimeUnit.MILLISECONDS);
                TestCase.fail((String)"expected exception");
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                if (e == null || e.getCause() == null) break block2;
                MatcherAssert.assertThat((Object)e.getCause().getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Exception raised during acceptRows loop"));
            }
        }
    }

    private <T> T getQuickly(Future<T> future) {
        try {
            return future.get(50L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            TestCase.fail();
            return null;
        }
    }

    private <T> void assertTimesOut(Future<T> next) {
        try {
            next.get(100L, TimeUnit.MILLISECONDS);
            TestCase.fail((String)"Expected timeout exception");
        }
        catch (Exception e) {
            MatcherAssert.assertThat((Object)e, (Matcher)CoreMatchers.instanceOf(TimeoutException.class));
        }
    }
}

