package org.apache.activemq.artemis.utils.actors;

import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:WEB-INF/lib/artemis-commons-2.23.0-tests.jar:org/apache/activemq/artemis/utils/actors/ThresholdActorTest.class */
public class ThresholdActorTest {
    Semaphore semaphore = new Semaphore(1);
    AtomicInteger result = new AtomicInteger(0);
    AtomicInteger lastProcessed = new AtomicInteger(0);
    AtomicInteger errors = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/artemis-commons-2.23.0-tests.jar:org/apache/activemq/artemis/utils/actors/ThresholdActorTest$Element.class */
    public static class Element {
        int i;
        int size;

        Element(int i, int i2) {
            this.i = i;
            this.size = i2;
        }
    }

    @Test
    public void limitedSize() throws Exception {
        this.lastProcessed.set(0);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        try {
            this.semaphore.acquire();
            ThresholdActor thresholdActor = new ThresholdActor(newSingleThreadExecutor, this::limitedProcess, 10, num -> {
                return 1;
            }, () -> {
                atomicInteger2.incrementAndGet();
                atomicBoolean.set(false);
            }, () -> {
                atomicInteger.incrementAndGet();
                atomicBoolean.set(true);
            });
            for (int i = 0; i < 10; i++) {
                thresholdActor.act(Integer.valueOf(i));
            }
            Assert.assertTrue(atomicBoolean.get());
            Assert.assertEquals(0L, atomicInteger2.get());
            thresholdActor.act(99);
            Assert.assertEquals(1L, atomicInteger2.get());
            Assert.assertEquals(0L, atomicInteger.get());
            Assert.assertFalse(atomicBoolean.get());
            thresholdActor.act(1000);
            thresholdActor.flush();
            Assert.assertEquals(1L, atomicInteger2.get());
            Assert.assertEquals(0L, atomicInteger.get());
            Assert.assertFalse(atomicBoolean.get());
            this.semaphore.release();
            Objects.requireNonNull(atomicBoolean);
            Wait.assertTrue(atomicBoolean::get);
            Assert.assertEquals(1L, atomicInteger2.get());
            Assert.assertEquals(1L, atomicInteger.get());
            AtomicInteger atomicInteger3 = this.lastProcessed;
            Objects.requireNonNull(atomicInteger3);
            Wait.assertEquals(1000, atomicInteger3::get, 5000L, 1L);
            thresholdActor.flush();
            atomicBoolean.set(false);
            Objects.requireNonNull(atomicInteger);
            Wait.assertEquals(2, atomicInteger::get, 5000L, 1L);
            Objects.requireNonNull(atomicBoolean);
            Wait.assertTrue(atomicBoolean::get);
            newSingleThreadExecutor.shutdown();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            throw th;
        }
    }

    public void limitedProcess(Integer num) {
        try {
            this.semaphore.acquire();
            this.result.incrementAndGet();
            this.lastProcessed.set(num.intValue());
            this.semaphore.release();
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }

    private static int getSize(Element element) {
        return element.size;
    }

    protected void process(Element element) {
        this.lastProcessed.set(element.i);
    }

    public void block() {
        try {
            if (!this.semaphore.tryAcquire()) {
                this.errors.incrementAndGet();
                System.err.println("acquire failed");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void unblock() {
        this.semaphore.release();
    }

    @Test
    public void testFlow() throws Exception {
        testFlow(true);
    }

    @Test
    public void testFlow2() throws Exception {
        testFlow(false);
    }

    public void testFlow(boolean z) throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            ThresholdActor thresholdActor = new ThresholdActor(newFixedThreadPool, this::process, 20, element -> {
                return element.size;
            }, this::block, this::unblock);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            newFixedThreadPool.execute(() -> {
                for (int i = 0; i <= 1111; i++) {
                    if (z) {
                        try {
                            this.semaphore.acquire();
                            this.semaphore.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                            this.errors.incrementAndGet();
                        }
                    }
                    thresholdActor.act(new Element(i, i % 2 == 0 ? 20 : 1));
                }
                countDownLatch.countDown();
            });
            Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            AtomicInteger atomicInteger = this.lastProcessed;
            Objects.requireNonNull(atomicInteger);
            Wait.assertEquals(1111, atomicInteger::get);
            Assert.assertEquals(0L, this.errors.get());
            newFixedThreadPool.shutdown();
        } catch (Throwable th) {
            newFixedThreadPool.shutdown();
            throw th;
        }
    }
}
