/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.stream;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
import org.apache.beam.sdk.fn.stream.DirectStreamObserver;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsCollectionWithSize;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class DirectStreamObserverTest {
    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from((ExecutorService)Executors.newCachedThreadPool());

    @Test
    public void testThreadSafety() throws Exception {
        ArrayList onNextValues = new ArrayList();
        AdvancingPhaser phaser = new AdvancingPhaser(1);
        AtomicBoolean isCriticalSectionShared = new AtomicBoolean();
        DirectStreamObserver streamObserver = new DirectStreamObserver((Phaser)phaser, TestStreams.withOnNext(t -> {
            Assert.assertFalse((boolean)isCriticalSectionShared.getAndSet(true));
            Uninterruptibles.sleepUninterruptibly((long)50L, (TimeUnit)TimeUnit.MILLISECONDS);
            onNextValues.add(t);
            Assert.assertTrue((boolean)isCriticalSectionShared.getAndSet(false));
        }).build());
        ImmutableList prefixes = ImmutableList.of((Object)"0", (Object)"1", (Object)"2", (Object)"3", (Object)"4");
        ArrayList<Callable<String>> tasks = new ArrayList<Callable<String>>();
        for (String prefix : prefixes) {
            tasks.add(() -> {
                for (int i = 0; i < 10; ++i) {
                    streamObserver.onNext((Object)(prefix + i));
                }
                return prefix;
            });
        }
        this.executor.invokeAll(tasks);
        streamObserver.onCompleted();
        int[] prefixesIndex = new int[prefixes.size()];
        Assert.assertEquals((long)50L, (long)onNextValues.size());
        for (String onNextValue : onNextValues) {
            int prefix = Integer.parseInt(onNextValue.substring(0, 1));
            int suffix = Integer.parseInt(onNextValue.substring(1, 2));
            Assert.assertEquals((long)prefixesIndex[prefix], (long)suffix);
            int n = prefix;
            prefixesIndex[n] = prefixesIndex[n] + 1;
        }
    }

    @Test
    public void testIsReadyIsHonored() throws Exception {
        AdvancingPhaser phaser = new AdvancingPhaser(1);
        AtomicBoolean elementsAllowed = new AtomicBoolean();
        DirectStreamObserver streamObserver = new DirectStreamObserver((Phaser)phaser, TestStreams.withOnNext(t -> Assert.assertTrue((boolean)elementsAllowed.get())).withIsReady(elementsAllowed::get).build(), 0);
        ArrayList<Future> results = new ArrayList<Future>();
        for (String prefix : ImmutableList.of((Object)"0", (Object)"1", (Object)"2", (Object)"3", (Object)"4")) {
            results.add(this.executor.submit(() -> {
                for (int i = 0; i < 10; ++i) {
                    streamObserver.onNext((Object)(prefix + i));
                }
                return prefix;
            }));
        }
        Uninterruptibles.sleepUninterruptibly((long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
        elementsAllowed.set(true);
        phaser.arrive();
        for (Future result : results) {
            result.get();
        }
        streamObserver.onCompleted();
    }

    @Test
    public void testIsReadyIsHonoredTermination() throws Exception {
        AdvancingPhaser phaser = new AdvancingPhaser(1);
        AtomicBoolean elementsAllowed = new AtomicBoolean();
        DirectStreamObserver streamObserver = new DirectStreamObserver((Phaser)phaser, TestStreams.withOnNext(t -> {
            if (phaser.isTerminated()) {
                throw new RuntimeException("Test stream terminated.");
            }
            Assert.assertTrue((boolean)elementsAllowed.get());
        }).withIsReady(elementsAllowed::get).build(), 0);
        ArrayList<Future> results = new ArrayList<Future>();
        for (String prefix : ImmutableList.of((Object)"0", (Object)"1", (Object)"2", (Object)"3", (Object)"4")) {
            results.add(this.executor.submit(() -> {
                for (int i = 0; i < 10; ++i) {
                    streamObserver.onNext((Object)(prefix + i));
                }
                return prefix;
            }));
        }
        Uninterruptibles.sleepUninterruptibly((long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
        phaser.forceTermination();
        Assert.assertThrows((String)"Test stream terminated.", RuntimeException.class, () -> streamObserver.onNext((Object)"100"));
        for (Future result : results) {
            Assert.assertThrows(ExecutionException.class, () -> {
                String cfr_ignored_0 = (String)result.get();
            });
        }
        streamObserver.onCompleted();
    }

    @Test
    public void testIsReadyCheckDoesntBlockIfPhaserCallbackNeverHappens() throws Exception {
        AtomicBoolean elementsAllowed = new AtomicBoolean();
        DirectStreamObserver streamObserver = new DirectStreamObserver((Phaser)new AdvancingPhaser(1), TestStreams.withOnNext(t -> Assert.assertTrue((boolean)elementsAllowed.get())).withIsReady(elementsAllowed::get).build(), 0);
        ArrayList<Future> results = new ArrayList<Future>();
        for (String prefix : ImmutableList.of((Object)"0", (Object)"1", (Object)"2", (Object)"3", (Object)"4")) {
            results.add(this.executor.submit(() -> {
                for (int i = 0; i < 10; ++i) {
                    streamObserver.onNext((Object)(prefix + i));
                }
                return prefix;
            }));
        }
        Uninterruptibles.sleepUninterruptibly((long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
        elementsAllowed.set(true);
        for (Future result : results) {
            result.get();
        }
        streamObserver.onCompleted();
    }

    @Test
    public void testMessageCheckInterval() throws Exception {
        AtomicInteger index = new AtomicInteger();
        ArrayListMultimap values = ArrayListMultimap.create();
        DirectStreamObserver streamObserver = new DirectStreamObserver((Phaser)new AdvancingPhaser(1), TestStreams.withOnNext(t -> Assert.assertTrue((boolean)values.put((Object)index.get(), t))).withIsReady(() -> {
            index.incrementAndGet();
            return true;
        }).build(), 10);
        ImmutableList prefixes = ImmutableList.of((Object)"0", (Object)"1", (Object)"2", (Object)"3", (Object)"4");
        ArrayList<Future> results = new ArrayList<Future>();
        for (String prefix : prefixes) {
            results.add(this.executor.submit(() -> {
                for (int i = 0; i < 10; ++i) {
                    streamObserver.onNext((Object)(prefix + i));
                }
                return prefix;
            }));
        }
        for (Future result : results) {
            result.get();
        }
        Assert.assertEquals((long)50L, (long)values.size());
        for (Collection valuesPerMessageCheck : values.asMap().values()) {
            MatcherAssert.assertThat((Object)valuesPerMessageCheck, (Matcher)IsCollectionWithSize.hasSize((int)10));
        }
        int[] prefixesIndex = new int[prefixes.size()];
        for (String onNextValue : values.values()) {
            int prefix = Integer.parseInt(onNextValue.substring(0, 1));
            int suffix = Integer.parseInt(onNextValue.substring(1, 2));
            Assert.assertEquals((long)prefixesIndex[prefix], (long)suffix);
            int n = prefix;
            prefixesIndex[n] = prefixesIndex[n] + 1;
        }
    }

    @Test
    public void testPhaserTermination() throws Exception {
        AtomicInteger index = new AtomicInteger();
        ArrayListMultimap values = ArrayListMultimap.create();
        DirectStreamObserver streamObserver = new DirectStreamObserver((Phaser)new AdvancingPhaser(1), TestStreams.withOnNext(t -> Assert.assertTrue((boolean)values.put((Object)index.get(), t))).withIsReady(() -> {
            index.incrementAndGet();
            return true;
        }).build(), 10);
        ImmutableList prefixes = ImmutableList.of((Object)"0", (Object)"1", (Object)"2", (Object)"3", (Object)"4");
        ArrayList<Future> results = new ArrayList<Future>();
        for (String prefix : prefixes) {
            results.add(this.executor.submit(() -> {
                for (int i = 0; i < 10; ++i) {
                    streamObserver.onNext((Object)(prefix + i));
                }
                return prefix;
            }));
        }
        for (Future result : results) {
            result.get();
        }
        Assert.assertEquals((long)50L, (long)values.size());
        for (Collection valuesPerMessageCheck : values.asMap().values()) {
            MatcherAssert.assertThat((Object)valuesPerMessageCheck, (Matcher)IsCollectionWithSize.hasSize((int)10));
        }
        int[] prefixesIndex = new int[prefixes.size()];
        for (String onNextValue : values.values()) {
            int prefix = Integer.parseInt(onNextValue.substring(0, 1));
            int suffix = Integer.parseInt(onNextValue.substring(1, 2));
            Assert.assertEquals((long)prefixesIndex[prefix], (long)suffix);
            int n = prefix;
            prefixesIndex[n] = prefixesIndex[n] + 1;
        }
    }
}

