package org.apache.beam.sdk.fn.stream;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.sdk.fn.test.TestStreams;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.UnmodifiableIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/fn/stream/BufferingStreamObserverTest.class */
public class BufferingStreamObserverTest {

    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from(Executors.newCachedThreadPool());

    @Test
    public void testThreadSafety() throws Exception {
        ArrayList arrayList = new ArrayList();
        AdvancingPhaser advancingPhaser = new AdvancingPhaser(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        BufferingStreamObserver bufferingStreamObserver = new BufferingStreamObserver(advancingPhaser, TestStreams.withOnNext(str -> {
            Assert.assertFalse(atomicBoolean.getAndSet(true));
            Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MILLISECONDS);
            arrayList.add(str);
            Assert.assertTrue(atomicBoolean.getAndSet(false));
        }).build(), this.executor, 3);
        ImmutableList<String> of = ImmutableList.of("0", "1", "2", "3", "4");
        ArrayList arrayList2 = new ArrayList();
        for (String str2 : of) {
            arrayList2.add(() -> {
                for (int i = 0; i < 10; i++) {
                    bufferingStreamObserver.onNext(str2 + i);
                }
                return str2;
            });
        }
        Iterator it = this.executor.invokeAll(arrayList2).iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        bufferingStreamObserver.onCompleted();
        int[] iArr = new int[of.size()];
        Assert.assertEquals(50L, arrayList.size());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            int parseInt = Integer.parseInt(((String) it2.next()).substring(0, 1));
            Assert.assertEquals(iArr[parseInt], Integer.parseInt(r0.substring(1, 2)));
            iArr[parseInt] = iArr[parseInt] + 1;
        }
    }

    @Test
    public void testIsReadyIsHonored() throws Exception {
        AdvancingPhaser advancingPhaser = new AdvancingPhaser(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        TestStreams.Builder withOnNext = TestStreams.withOnNext(str -> {
            Assert.assertTrue(atomicBoolean.get());
        });
        Objects.requireNonNull(atomicBoolean);
        BufferingStreamObserver bufferingStreamObserver = new BufferingStreamObserver(advancingPhaser, withOnNext.withIsReady(atomicBoolean::get).build(), this.executor, 3);
        ArrayList arrayList = new ArrayList();
        UnmodifiableIterator it = ImmutableList.of("0", "1", "2", "3", "4").iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            arrayList.add(this.executor.submit(() -> {
                for (int i = 0; i < 10; i++) {
                    bufferingStreamObserver.onNext(str2 + i);
                }
                return str2;
            }));
        }
        Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
        atomicBoolean.set(true);
        advancingPhaser.arrive();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Future) it2.next()).get();
        }
        bufferingStreamObserver.onCompleted();
    }
}
