/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.beam.sdk.fn.CancellableQueue;
import org.apache.beam.sdk.fn.test.TestExecutors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class CancellableQueueTest {
    @Rule
    public final TestExecutors.TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);
    private static final int MAX_ELEMENTS = 10000;

    @Test(timeout=10000L)
    public void runTestForMultipleConsumersAndProducers() throws Exception {
        CancellableQueue queue = new CancellableQueue(100);
        this.runTestForMultipleConsumersAndProducers((CancellableQueue<String>)queue);
        queue.reset();
        this.runTestForMultipleConsumersAndProducers((CancellableQueue<String>)queue);
    }

    public void runTestForMultipleConsumersAndProducers(CancellableQueue<String> queue) throws Exception {
        ArrayList<Future> futures = new ArrayList<Future>();
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                queue.put((Object)("A" + i));
            }
            return null;
        }));
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                queue.put((Object)("B" + i));
            }
            return null;
        }));
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                queue.put((Object)("C" + i));
            }
            return null;
        }));
        ArrayList valuesReadX = new ArrayList();
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                valuesReadX.add((String)queue.take());
            }
            return null;
        }));
        ArrayList valuesReadY = new ArrayList();
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                valuesReadY.add((String)queue.take());
            }
            return null;
        }));
        ArrayList valuesReadZ = new ArrayList();
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                valuesReadZ.add((String)queue.take());
            }
            return null;
        }));
        for (Future future : futures) {
            future.get();
        }
        Assert.assertEquals((long)30000L, (long)(valuesReadX.size() + valuesReadY.size() + valuesReadZ.size()));
        HashSet allValues = new HashSet();
        allValues.addAll(valuesReadX);
        allValues.addAll(valuesReadY);
        allValues.addAll(valuesReadZ);
        HashSet<String> expectedValues = new HashSet<String>();
        for (int i = 0; i < 10000; ++i) {
            expectedValues.add("A" + i);
            expectedValues.add("B" + i);
            expectedValues.add("C" + i);
        }
        MatcherAssert.assertThat((Object)Sets.difference(allValues, expectedValues), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)Sets.difference(expectedValues, allValues), (Matcher)Matchers.empty());
    }

    @Test(timeout=10000L)
    public void testCancellation() throws Exception {
        CancellableQueue queue = new CancellableQueue(100);
        ArrayList<Future> futures = new ArrayList<Future>();
        futures.add(this.executor.submit(() -> {
            while (true) {
                queue.put((Object)"A");
            }
        }));
        futures.add(this.executor.submit(() -> {
            while (true) {
                queue.put((Object)"B");
            }
        }));
        futures.add(this.executor.submit(() -> {
            while (true) {
                queue.put((Object)"C");
            }
        }));
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                queue.take();
            }
            queue.cancel((Exception)new IllegalStateException("test cancel"));
            queue.take();
            return null;
        }));
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                queue.take();
            }
            queue.cancel((Exception)new IllegalStateException("test cancel"));
            queue.take();
            return null;
        }));
        futures.add(this.executor.submit(() -> {
            for (int i = 0; i < 10000; ++i) {
                queue.take();
            }
            queue.cancel((Exception)new IllegalStateException("test cancel"));
            queue.take();
            return null;
        }));
        for (Future future : futures) {
            Assert.assertThrows((String)"test cancel", IllegalStateException.class, () -> {
                try {
                    future.get();
                }
                catch (ExecutionException e) {
                    throw e.getCause();
                }
            });
        }
        queue.reset();
        this.runTestForMultipleConsumersAndProducers((CancellableQueue<String>)queue);
    }

    @Test(timeout=10000L)
    public void testFirstCancellationError() throws Exception {
        CancellableQueue queue = new CancellableQueue(100);
        queue.cancel((Exception)new RuntimeException("First cancel exception"));
        Assert.assertThrows((String)"First cancel exception", RuntimeException.class, () -> {
            String cfr_ignored_0 = (String)queue.take();
        });
        queue.cancel((Exception)new RuntimeException("Second cancel exception"));
        Assert.assertThrows((String)"First cancel exception", RuntimeException.class, () -> {
            String cfr_ignored_0 = (String)queue.take();
        });
    }

    @Test
    public void testMemoryReferenceOnTake() throws Exception {
        String s1 = new String("test1");
        String s2 = new String("test2");
        WeakReference<String> weakReference1 = new WeakReference<String>(s1);
        WeakReference<String> weakReference2 = new WeakReference<String>(s2);
        CancellableQueue queue = new CancellableQueue(100);
        queue.put((Object)s1);
        queue.put((Object)s2);
        s1 = null;
        s2 = null;
        System.gc();
        Assert.assertTrue((weakReference1.get() != null ? 1 : 0) != 0);
        Assert.assertTrue((weakReference2.get() != null ? 1 : 0) != 0);
        Assert.assertEquals((Object)"test1", (Object)queue.take());
        System.gc();
        Assert.assertTrue((weakReference1.get() == null ? 1 : 0) != 0);
        Assert.assertTrue((weakReference2.get() != null ? 1 : 0) != 0);
        queue.reset();
        System.gc();
        Assert.assertTrue((weakReference1.get() == null ? 1 : 0) != 0);
        Assert.assertTrue((weakReference2.get() == null ? 1 : 0) != 0);
    }
}

