package org.apache.streams.local.builders;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.google.common.util.concurrent.Uninterruptibles;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
import org.apache.streams.local.test.processors.SlowProcessor;
import org.apache.streams.local.test.providers.EmptyResultSetProvider;
import org.apache.streams.local.test.providers.NumericMessageProvider;
import org.apache.streams.local.test.writer.DatumCounterWriter;
import org.apache.streams.local.test.writer.SystemOutWriter;
import org.apache.streams.util.ComponentUtils;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/streams/local/builders/LocalStreamBuilderTest.class */
public class LocalStreamBuilderTest extends RandomizedTest {
    private static final String MBEAN_ID = "test_id";
    private static final String STREAM_ID = "test_stream";
    private static long STREAM_START_TIME = new DateTime().getMillis();

    @After
    public void removeLocalMBeans() {
        try {
            ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local");
        } catch (Exception e) {
        }
    }

    public void removeRegisteredMBeans(String... strArr) {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        for (String str : strArr) {
            try {
                platformMBeanServer.unregisterMBean(new ObjectName(String.format("org.apache.streams.local:type=ThroughputQueue,name=%s,identifier=%s,startedAt=%s", str, STREAM_ID, Long.valueOf(STREAM_START_TIME))));
            } catch (MalformedObjectNameException | InstanceNotFoundException | MBeanRegistrationException e) {
            }
            try {
                platformMBeanServer.unregisterMBean(new ObjectName(String.format("org.apache.streams.local:type=StreamsTaskCounter,name=%s,identifier=%s,startedAt=%s", str, STREAM_ID, Long.valueOf(STREAM_START_TIME))));
            } catch (MalformedObjectNameException | InstanceNotFoundException | MBeanRegistrationException e2) {
            }
        }
    }

    @Test
    public void testStreamIdValidations() {
        LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder();
        localStreamBuilder.newReadCurrentStream("id", new NumericMessageProvider(1));
        RuntimeException runtimeException = null;
        try {
            localStreamBuilder.newReadCurrentStream("id", new NumericMessageProvider(1));
        } catch (RuntimeException e) {
            runtimeException = e;
        }
        Assert.assertNotNull(runtimeException);
        RuntimeException runtimeException2 = null;
        localStreamBuilder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor("1"), 1, new String[]{"id"});
        try {
            localStreamBuilder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor("2"), 1, new String[]{"id", "id2"});
        } catch (RuntimeException e2) {
            runtimeException2 = e2;
        }
        Assert.assertNotNull(runtimeException2);
        removeRegisteredMBeans("1", "2", "id");
    }

    @Test
    public void testBasicLinearStream1() {
        linearStreamNonParallel(1, 1);
    }

    @Test
    public void testBasicLinearStream2() {
        linearStreamNonParallel(1004, 1);
    }

    @Test
    public void testBasicLinearStream3() {
        linearStreamNonParallel(1, 10);
    }

    @Test
    @Repeat(iterations = 3)
    public void testBasicLinearStreamRandom() {
        linearStreamNonParallel(randomIntBetween(1, 100000), randomIntBetween(1, 10));
    }

    /* JADX WARN: Finally extract failed */
    private void linearStreamNonParallel(int i, int i2) {
        try {
            LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder(10);
            localStreamBuilder.newPerpetualStream("numeric_provider", new NumericMessageProvider(i));
            int i3 = 0;
            while (i3 < i2) {
                localStreamBuilder.addStreamsProcessor("proc" + i3, new PassthroughDatumCounterProcessor("proc" + i3), 1, new String[]{i3 == 0 ? "numeric_provider" : "proc" + (i3 - 1)});
                i3++;
            }
            Collections.newSetFromMap(new ConcurrentHashMap());
            localStreamBuilder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, new String[]{"proc" + (i2 - 1)});
            localStreamBuilder.start();
            for (int i4 = 0; i4 < i2; i4++) {
                Assert.assertEquals("Processor " + i4 + " did not receive all of the datums", i, PassthroughDatumCounterProcessor.COUNTS.get("proc" + i4).get());
            }
            for (int i5 = 0; i5 < i; i5++) {
                Assert.assertTrue("Expected writer to have received : " + i5, DatumCounterWriter.RECEIVED.get("writer").contains(Integer.valueOf(i5)));
            }
            for (int i6 = 0; i6 < i2; i6++) {
                removeRegisteredMBeans("proc" + i6, "proc" + i6 + "-" + PassthroughDatumCounterProcessor.class.getCanonicalName());
            }
            removeRegisteredMBeans("writer", "numeric_provider");
        } catch (Throwable th) {
            for (int i7 = 0; i7 < i2; i7++) {
                removeRegisteredMBeans("proc" + i7, "proc" + i7 + "-" + PassthroughDatumCounterProcessor.class.getCanonicalName());
            }
            removeRegisteredMBeans("writer", "numeric_provider");
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testParallelLinearStream1() {
        int randomIntBetween = randomIntBetween(1, 10);
        int randomIntBetween2 = randomIntBetween(1, 300000);
        try {
            LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder(50);
            localStreamBuilder.newPerpetualStream("numeric_provider", new NumericMessageProvider(randomIntBetween2));
            int i = 0;
            while (i < randomIntBetween) {
                localStreamBuilder.addStreamsProcessor("proc" + i, new PassthroughDatumCounterProcessor("proc" + i), randomIntBetween(1, 5), new String[]{i == 0 ? "numeric_provider" : "proc" + (i - 1)});
                i++;
            }
            localStreamBuilder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, new String[]{"proc" + (randomIntBetween - 1)});
            localStreamBuilder.start();
            Uninterruptibles.sleepUninterruptibly(5L, TimeUnit.SECONDS);
            localStreamBuilder.stop();
            Uninterruptibles.sleepUninterruptibly(5L, TimeUnit.SECONDS);
            Assert.assertEquals(randomIntBetween2, DatumCounterWriter.RECEIVED.get("writer").size());
            for (int i2 = 0; i2 < randomIntBetween2; i2++) {
                Assert.assertTrue("Expected Writer to receive datum : " + i2, DatumCounterWriter.RECEIVED.get("writer").contains(Integer.valueOf(i2)));
            }
            for (int i3 = 0; i3 < randomIntBetween; i3++) {
                Assert.assertEquals(randomIntBetween2, PassthroughDatumCounterProcessor.COUNTS.get("proc" + i3).get());
            }
            for (int i4 = 0; i4 < randomIntBetween; i4++) {
                removeRegisteredMBeans("proc" + i4);
            }
            removeRegisteredMBeans("writer", "numeric_provider");
        } catch (Throwable th) {
            for (int i5 = 0; i5 < randomIntBetween; i5++) {
                removeRegisteredMBeans("proc" + i5);
            }
            removeRegisteredMBeans("writer", "numeric_provider");
            throw th;
        }
    }

    @Test
    public void testBasicMergeStream() {
        try {
            int randomIntBetween = randomIntBetween(1, 300000);
            int randomIntBetween2 = randomIntBetween(1, 300000);
            PassthroughDatumCounterProcessor passthroughDatumCounterProcessor = new PassthroughDatumCounterProcessor("proc1");
            PassthroughDatumCounterProcessor passthroughDatumCounterProcessor2 = new PassthroughDatumCounterProcessor("proc2");
            LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder();
            localStreamBuilder.newPerpetualStream("sp1", new NumericMessageProvider(randomIntBetween)).newPerpetualStream("sp2", new NumericMessageProvider(randomIntBetween2)).addStreamsProcessor("proc1", passthroughDatumCounterProcessor, 1, new String[]{"sp1"}).addStreamsProcessor("proc2", passthroughDatumCounterProcessor2, 1, new String[]{"sp2"}).addStreamsPersistWriter("writer1", new DatumCounterWriter("writer"), 1, new String[]{"proc1", "proc2"});
            localStreamBuilder.start();
            Assert.assertEquals(randomIntBetween, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
            Assert.assertEquals(randomIntBetween2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
            Assert.assertEquals(randomIntBetween + randomIntBetween2, DatumCounterWriter.COUNTS.get("writer").get());
            String str = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str2 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
        } catch (Throwable th) {
            String str3 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str4 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2");
            throw th;
        }
    }

    @Test
    public void testBasicBranch() {
        try {
            int randomIntBetween = randomIntBetween(1, 300000);
            LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder(50);
            localStreamBuilder.newPerpetualStream("prov1", new NumericMessageProvider(randomIntBetween)).addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, new String[]{"prov1"}).addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, new String[]{"prov1"}).addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, new String[]{"proc1", "proc2"});
            localStreamBuilder.start();
            Assert.assertEquals(randomIntBetween, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get());
            Assert.assertEquals(randomIntBetween, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get());
            Assert.assertEquals(randomIntBetween * 2, DatumCounterWriter.COUNTS.get("writer").get());
            String str = "-" + NumericMessageProvider.class.getCanonicalName();
            String str2 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str3 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
        } catch (Throwable th) {
            String str4 = "-" + NumericMessageProvider.class.getCanonicalName();
            String str5 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str6 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
            throw th;
        }
    }

    @Test
    public void testSlowProcessorBranch() {
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("TIMEOUT", 2000);
            LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder(hashMap);
            localStreamBuilder.newPerpetualStream("prov1", new NumericMessageProvider(30)).addStreamsProcessor("proc1", new SlowProcessor(), 1, new String[]{"prov1"}).addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, new String[]{"proc1"});
            localStreamBuilder.start();
            Assert.assertEquals(30, DatumCounterWriter.COUNTS.get("writer").get());
            String str = "-" + NumericMessageProvider.class.getCanonicalName();
            String str2 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str3 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("prov1", "proc1", "w1");
        } catch (Throwable th) {
            String str4 = "-" + NumericMessageProvider.class.getCanonicalName();
            String str5 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str6 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("prov1", "proc1", "w1");
            throw th;
        }
    }

    @Test
    public void testConfiguredProviderTimeout() {
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("TIMEOUT", 10000);
            long currentTimeMillis = System.currentTimeMillis();
            LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder(-1, hashMap);
            localStreamBuilder.newPerpetualStream("prov1", new EmptyResultSetProvider()).addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, new String[]{"prov1"}).addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, new String[]{"proc1"}).addStreamsPersistWriter("w1", new SystemOutWriter(), 1, new String[]{"proc1"});
            localStreamBuilder.start();
            Assert.assertThat(Integer.valueOf((int) (System.currentTimeMillis() - currentTimeMillis)), Matchers.is(Matchers.allOf(Matchers.greaterThanOrEqualTo(10000), Matchers.lessThanOrEqualTo(Integer.valueOf(4 * 10000)))));
            String str = "-" + NumericMessageProvider.class.getCanonicalName();
            String str2 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str3 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
        } catch (Throwable th) {
            String str4 = "-" + NumericMessageProvider.class.getCanonicalName();
            String str5 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str6 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("prov1", "proc1", "proc2", "w1");
            throw th;
        }
    }

    @Test
    @Ignore
    public void ensureShutdownWithBlockedQueue() throws InterruptedException {
        try {
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
            int activeCount = Thread.activeCount();
            LocalStreamBuilder localStreamBuilder = new LocalStreamBuilder();
            localStreamBuilder.newPerpetualStream("prov1", new NumericMessageProvider(30)).addStreamsProcessor("proc1", new SlowProcessor(), 1, new String[]{"prov1"}).addStreamsPersistWriter("w1", new SystemOutWriter(), 1, new String[]{"proc1"});
            localStreamBuilder.getClass();
            newSingleThreadExecutor.submit(localStreamBuilder::start);
            Thread.sleep(500L);
            localStreamBuilder.stop();
            newSingleThreadExecutor.shutdownNow();
            newSingleThreadExecutor.awaitTermination(30000L, TimeUnit.MILLISECONDS);
            Assert.assertThat(Integer.valueOf(Thread.activeCount()), Matchers.is(Matchers.equalTo(Integer.valueOf(activeCount))));
            String str = "-" + NumericMessageProvider.class.getCanonicalName();
            String str2 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str3 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("prov1", "proc1", "w1");
        } catch (Throwable th) {
            String str4 = "-" + NumericMessageProvider.class.getCanonicalName();
            String str5 = "-" + PassthroughDatumCounterProcessor.class.getCanonicalName();
            String str6 = "-" + DatumCounterWriter.class.getCanonicalName();
            removeRegisteredMBeans("prov1", "proc1", "w1");
            throw th;
        }
    }

    @Before
    private void clearCounters() {
        PassthroughDatumCounterProcessor.COUNTS.clear();
        PassthroughDatumCounterProcessor.CLAIMED_ID.clear();
        PassthroughDatumCounterProcessor.SEEN_DATA.clear();
        DatumCounterWriter.COUNTS.clear();
        DatumCounterWriter.CLAIMED_ID.clear();
        DatumCounterWriter.SEEN_DATA.clear();
        DatumCounterWriter.RECEIVED.clear();
    }

    private StreamsProcessor createPassThroughProcessor(final AtomicInteger atomicInteger) {
        StreamsProcessor streamsProcessor = (StreamsProcessor) Mockito.mock(StreamsProcessor.class);
        Mockito.when(streamsProcessor.process((StreamsDatum) org.mockito.Matchers.any(StreamsDatum.class))).thenAnswer(new Answer<List<StreamsDatum>>() { // from class: org.apache.streams.local.builders.LocalStreamBuilderTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public List<StreamsDatum> m1answer(InvocationOnMock invocationOnMock) throws Throwable {
                LinkedList linkedList = new LinkedList();
                if (atomicInteger != null) {
                    atomicInteger.incrementAndGet();
                }
                linkedList.add((StreamsDatum) invocationOnMock.getArguments()[0]);
                return linkedList;
            }
        });
        return streamsProcessor;
    }

    private StreamsPersistWriter createSetCollectingWriter(Set set) {
        return createSetCollectingWriter(set, null);
    }

    private StreamsPersistWriter createSetCollectingWriter(Set set, AtomicInteger atomicInteger) {
        StreamsPersistWriter streamsPersistWriter = (StreamsPersistWriter) Mockito.mock(StreamsPersistWriter.class);
        ((StreamsPersistWriter) Mockito.doAnswer(invocationOnMock -> {
            if (atomicInteger != null) {
                atomicInteger.incrementAndGet();
            }
            set.add(((StreamsDatum) invocationOnMock.getArguments()[0]).getDocument());
            return null;
        }).when(streamsPersistWriter)).write((StreamsDatum) org.mockito.Matchers.any(StreamsDatum.class));
        return streamsPersistWriter;
    }
}
