/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.di.trans.streaming.common;

import io.reactivex.Flowable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.SubtransExecutor;
import org.pentaho.di.trans.streaming.common.BaseStreamStep;
import org.pentaho.di.trans.streaming.common.FixedTimeStreamWindow;

@RunWith(value=MockitoJUnitRunner.class)
public class FixedTimeStreamWindowTest {
    @Mock
    private SubtransExecutor subtransExecutor;

    @Test
    public void emptyResultShouldNotThrowException() throws KettleException {
        Mockito.when((Object)this.subtransExecutor.execute((List)Matchers.any())).thenReturn(Optional.empty());
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta((ValueMetaInterface)new ValueMetaString("field"));
        FixedTimeStreamWindow window = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)rowMeta, 0L, 2, 1);
        window.buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result -> {});
    }

    @Test
    public void resultsComeBackToParent() throws KettleException {
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta((ValueMetaInterface)new ValueMetaString("field"));
        Result mockResult = new Result();
        mockResult.setRows(Arrays.asList(new RowMetaAndData((RowMetaInterface)rowMeta, new Object[]{"queen"}), new RowMetaAndData((RowMetaInterface)rowMeta, new Object[]{"king"})));
        Mockito.when((Object)this.subtransExecutor.execute((List)Matchers.any())).thenReturn(Optional.of(mockResult));
        FixedTimeStreamWindow window = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)rowMeta, 0L, 2, 1);
        window.buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result -> Assert.assertEquals((Object)mockResult, (Object)result));
    }

    @Test
    public void abortedSubtransThrowsAnError() throws KettleException {
        Result result1 = new Result();
        result1.setNrErrors(1L);
        Mockito.when((Object)this.subtransExecutor.execute((List)Matchers.any())).thenReturn(Optional.of(result1));
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta((ValueMetaInterface)new ValueMetaString("field"));
        FixedTimeStreamWindow window = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)rowMeta, 0L, 2, 1);
        try {
            window.buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result -> {});
        }
        catch (Exception e) {
            Assert.assertEquals((Object)BaseMessages.getString(BaseStreamStep.class, (String)"FixedTimeStreamWindow.SubtransFailed", (String[])new String[0]), (Object)e.getCause().getMessage().trim());
        }
    }

    @Test
    public void testSharedStreamingBatchPoolInternalState() throws Exception {
        System.setProperty("SHARED_STREAMING_BATCH_POOL_SIZE", "5");
        FixedTimeStreamWindow window1 = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)new RowMeta(), 0L, 2, 1);
        Field field1 = window1.getClass().getDeclaredField("sharedStreamingBatchPool");
        field1.setAccessible(true);
        ThreadPoolExecutor sharedStreamingBatchPool1 = (ThreadPoolExecutor)field1.get(window1);
        Assert.assertTrue((sharedStreamingBatchPool1.getCorePoolSize() == 5 ? 1 : 0) != 0);
        System.setProperty("SHARED_STREAMING_BATCH_POOL_SIZE", "10");
        FixedTimeStreamWindow window2 = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)new RowMeta(), 0L, 2, 1);
        Field field2 = window2.getClass().getDeclaredField("sharedStreamingBatchPool");
        field2.setAccessible(true);
        ThreadPoolExecutor sharedStreamingBatchPool2 = (ThreadPoolExecutor)field2.get(window2);
        Assert.assertTrue((sharedStreamingBatchPool2.getCorePoolSize() == 10 ? 1 : 0) != 0);
        Assert.assertEquals((Object)sharedStreamingBatchPool1, (Object)sharedStreamingBatchPool2);
    }

    @Test
    public void testSharedStreamingBatchPoolExecution() throws Exception {
        ArrayList errors = new ArrayList();
        System.setProperty("SHARED_STREAMING_BATCH_POOL_SIZE", "1");
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta((ValueMetaInterface)new ValueMetaString("field"));
        Result mockResult = new Result();
        mockResult.setRows(Arrays.asList(new RowMetaAndData((RowMetaInterface)rowMeta, new Object[]{"queen"}), new RowMetaAndData((RowMetaInterface)rowMeta, new Object[]{"king"})));
        FixedTimeStreamWindow window1 = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)rowMeta, 0L, 10, 10);
        FixedTimeStreamWindow window2 = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)rowMeta, 0L, 10, 10);
        Flowable flowable = Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")));
        Field field = window1.getClass().getDeclaredField("sharedStreamingBatchPool");
        field.setAccessible(true);
        ThreadPoolExecutor sharedStreamingBatchPool = (ThreadPoolExecutor)field.get(window1);
        Mockito.when((Object)this.subtransExecutor.execute((List)Matchers.any())).thenAnswer(invocation -> {
            if (sharedStreamingBatchPool.getActiveCount() != 1) {
                errors.add("Error: Active count should have been 1 at all times. Current value: " + sharedStreamingBatchPool.getActiveCount());
            }
            return Optional.of(mockResult);
        });
        Thread bufferThread1 = new Thread(new BufferThread(window1, flowable, mockResult));
        bufferThread1.start();
        Thread bufferThread2 = new Thread(new BufferThread(window2, flowable, mockResult));
        bufferThread2.start();
        Thread.sleep(10000L);
        Assert.assertEquals((long)0L, (long)errors.size());
    }

    @Test
    public void supportsPostProcessing() throws KettleException {
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta((ValueMetaInterface)new ValueMetaString("field"));
        Result mockResult = new Result();
        mockResult.setRows(Arrays.asList(new RowMetaAndData((RowMetaInterface)rowMeta, new Object[]{"queen"}), new RowMetaAndData((RowMetaInterface)rowMeta, new Object[]{"king"})));
        Mockito.when((Object)this.subtransExecutor.execute((List)Matchers.any())).thenReturn(Optional.of(mockResult));
        AtomicInteger count = new AtomicInteger();
        FixedTimeStreamWindow window = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)rowMeta, 0L, 2, 1, p -> count.set(((List)((List)p.getKey()).get(0)).size()));
        window.buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result -> Assert.assertEquals((Object)mockResult, (Object)result));
        Assert.assertEquals((long)2L, (long)count.get());
    }

    @Test
    public void emptyResultsNotPostProcessed() throws KettleException {
        RowMeta rowMeta = new RowMeta();
        rowMeta.addValueMeta((ValueMetaInterface)new ValueMetaString("field"));
        Result mockResult = new Result();
        mockResult.setRows(Arrays.asList(new RowMetaAndData((RowMetaInterface)rowMeta, new Object[]{"queen"}), new RowMetaAndData((RowMetaInterface)rowMeta, new Object[]{"king"})));
        Mockito.when((Object)this.subtransExecutor.execute((List)Matchers.any())).thenReturn(Optional.empty());
        AtomicInteger count = new AtomicInteger();
        FixedTimeStreamWindow window = new FixedTimeStreamWindow(this.subtransExecutor, (RowMetaInterface)rowMeta, 0L, 2, 1, p -> count.set(((List)((List)p.getKey()).get(0)).size()));
        window.buffer(Flowable.fromIterable(Collections.singletonList(Arrays.asList("v1", "v2")))).forEach(result -> Assert.assertEquals((Object)mockResult, (Object)result));
        Assert.assertEquals((long)0L, (long)count.get());
    }

    private class BufferThread
    implements Runnable {
        private FixedTimeStreamWindow window;
        private Flowable flowable;
        private Result mockResult;

        public BufferThread(FixedTimeStreamWindow window, Flowable flowable, Result mockResult) {
            this.window = window;
            this.flowable = flowable;
            this.mockResult = mockResult;
        }

        @Override
        public void run() {
            this.window.buffer(this.flowable).forEach(result -> Assert.assertEquals((Object)this.mockResult, (Object)result));
        }
    }
}

