package org.apache.ratis.util;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:ratis-test-2.1.0-tests.jar:org/apache/ratis/util/TestDataBlockingQueue.class
 */
/* loaded from: input_file:test-classes/org/apache/ratis/util/TestDataBlockingQueue.class */
public class TestDataBlockingQueue {
    static final Logger LOG = LoggerFactory.getLogger(TestDataBlockingQueue.class);
    final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
    final int elementLimit = 10;
    final DataBlockingQueue<Integer> q = new DataBlockingQueue<>((Object) null, this.byteLimit, 10, (v0) -> {
        return v0.intValue();
    });
    final TimeDuration slow = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
    final TimeDuration fast = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);

    @Test(timeout = 1000)
    public void testElementLimit() {
        TestDataQueue.runTestElementLimit(this.q);
    }

    @Test(timeout = 1000)
    public void testByteLimit() {
        TestDataQueue.runTestByteLimit(this.q);
    }

    @Test(timeout = 10000)
    public void testSlowOfferFastPoll() throws Exception {
        runTestBlockingCalls(this.slow, this.fast, this.q);
    }

    @Test(timeout = 10000)
    public void testFastOfferSlowPoll() throws Exception {
        runTestBlockingCalls(this.fast, this.slow, this.q);
    }

    static void assertOfferPull(int i, int i2, int i3) {
        Assert.assertTrue(i >= i2);
        Assert.assertTrue(i - i2 <= i3 + 1);
    }

    static void runTestBlockingCalls(TimeDuration timeDuration, TimeDuration timeDuration2, DataBlockingQueue<Integer> dataBlockingQueue) throws Exception {
        Assert.assertTrue(dataBlockingQueue.isEmpty());
        ExitUtils.disableSystemExit();
        int elementLimit = dataBlockingQueue.getElementLimit();
        TimeDuration min = CollectionUtils.min(timeDuration, timeDuration2);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        Thread thread = new Thread(() -> {
            while (atomicInteger2.get() < 30) {
                try {
                    timeDuration2.sleep();
                    Integer num = (Integer) dataBlockingQueue.poll(min);
                    if (num != null) {
                        Assert.assertEquals(atomicInteger2.incrementAndGet(), num.intValue());
                        LOG.info("polled {}", num);
                    }
                    assertOfferPull(atomicInteger.get(), atomicInteger2.get(), elementLimit);
                } catch (Exception e) {
                    ExitUtils.terminate(-2, "pollThread failed", e, (Logger) null);
                    return;
                }
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                atomicInteger.incrementAndGet();
                while (atomicInteger.get() <= 30) {
                    timeDuration.sleep();
                    if (dataBlockingQueue.offer(Integer.valueOf(atomicInteger.get()), min)) {
                        LOG.info("offered {}", Integer.valueOf(atomicInteger.getAndIncrement()));
                    }
                    assertOfferPull(atomicInteger.get(), atomicInteger2.get(), elementLimit);
                }
            } catch (Exception e) {
                ExitUtils.terminate(-1, "offerThread failed", e, (Logger) null);
            }
        });
        thread.start();
        thread2.start();
        thread2.join();
        thread.join();
        Assert.assertEquals(31L, atomicInteger.get());
        Assert.assertEquals(30L, atomicInteger2.get());
        Assert.assertTrue(dataBlockingQueue.isEmpty());
        ExitUtils.assertNotTerminated();
    }
}
