package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.tuple.Tuple;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.jctools.queues.SpscArrayQueue;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnitParamsRunner.class)
/* loaded from: input_file:com/datatorrent/stram/engine/AbstractReservoirTest.class */
public class AbstractReservoirTest {

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private static final Logger logger = LoggerFactory.getLogger(AbstractReservoirTest.class);
    private static final String countPropertyName = "com.datatorrent.stram.engine.AbstractReservoirTest.count";
    private static final int COUNT = Integer.getInteger(countPropertyName, 10000000).intValue();
    private static final String capacityPropertyName = "com.datatorrent.stram.engine.AbstractReservoirTest.capacity";
    private static final int CAPACITY = Integer.getInteger(capacityPropertyName, (Integer) Context.PortContext.QUEUE_CAPACITY.defaultValue).intValue();

    private static AbstractReservoir newReservoir(String str, int i) {
        if (str == null) {
            System.clearProperty("com.datatorrent.stram.engine.Reservoir");
        } else {
            System.setProperty("com.datatorrent.stram.engine.Reservoir", str);
        }
        return AbstractReservoir.newReservoir(str == null ? "DefaultReservoir" : str, i);
    }

    private static void setSink(AbstractReservoir abstractReservoir, Sink<Object> sink) {
        Assert.assertNull(abstractReservoir.setSink(sink));
        Assert.assertEquals(sink, abstractReservoir.getSink());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Object defaultTestParameters() {
        Object[] objArr = {new Object[]{null, NoSuchElementException.class}, new Object[]{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayQueueReservoir", NoSuchElementException.class}, new Object[]{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayBlockingQueueReservoir", NoSuchElementException.class}, new Object[]{"com.datatorrent.stram.engine.AbstractReservoir$ArrayBlockingQueueReservoir", NoSuchElementException.class}, new Object[]{"com.datatorrent.stram.engine.AbstractReservoir$CircularBufferReservoir", IllegalStateException.class}};
        for (AbstractReservoir[] abstractReservoirArr : objArr) {
            abstractReservoirArr[0] = newReservoir((String) abstractReservoirArr[0], 2);
            setSink((AbstractReservoir) abstractReservoirArr[0], new Sink<Object>() { // from class: com.datatorrent.stram.engine.AbstractReservoirTest.1
                private int count;

                public void put(Object obj) {
                    this.count++;
                }

                public int getCount(boolean z) {
                    try {
                        int i = this.count;
                        if (z) {
                            this.count = 0;
                        }
                        return i;
                    } catch (Throwable th) {
                        if (z) {
                            this.count = 0;
                        }
                        throw th;
                    }
                }
            });
        }
        return objArr;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Object performanceTestParameters() {
        Object[] objArr = {new Object[]{null, 2500}, new Object[]{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayQueueReservoir", 10000}, new Object[]{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayBlockingQueueReservoir", 2500}, new Object[]{"com.datatorrent.stram.engine.AbstractReservoir$ArrayBlockingQueueReservoir", 10000}, new Object[]{"com.datatorrent.stram.engine.AbstractReservoir$CircularBufferReservoir", 100000}};
        for (AbstractReservoir[] abstractReservoirArr : objArr) {
            abstractReservoirArr[0] = newReservoir((String) abstractReservoirArr[0], CAPACITY);
            setSink((AbstractReservoir) abstractReservoirArr[0], new Sink<Object>() { // from class: com.datatorrent.stram.engine.AbstractReservoirTest.2
                private int count = 0;

                public void put(Object obj) {
                    int i = this.count + 1;
                    this.count = i;
                    if (i == AbstractReservoirTest.COUNT) {
                        throw new RuntimeException();
                    }
                }

                public int getCount(boolean z) {
                    return this.count;
                }
            });
        }
        return objArr;
    }

    @Test
    @Parameters(method = "defaultTestParameters")
    public void testEmpty(AbstractReservoir abstractReservoir, Class<? extends Throwable> cls) {
        Assert.assertTrue(abstractReservoir.isEmpty());
        Assert.assertEquals(0L, abstractReservoir.size());
        Assert.assertEquals(0L, abstractReservoir.size(true));
        Assert.assertEquals(0L, abstractReservoir.size(false));
        Assert.assertNull(abstractReservoir.sweep());
        this.exception.expect(cls);
        abstractReservoir.remove();
    }

    @Test
    @Parameters(method = "defaultTestParameters")
    public void testAddAndSweepObject(AbstractReservoir abstractReservoir, Class<? extends Throwable> cls) {
        Integer num = new Integer(0);
        Assert.assertTrue(abstractReservoir.add(num));
        Assert.assertFalse(abstractReservoir.isEmpty());
        Assert.assertEquals(1L, abstractReservoir.size());
        Assert.assertEquals(1L, abstractReservoir.size(false));
        Assert.assertEquals(0L, abstractReservoir.getCount(false));
        Assert.assertEquals(num, abstractReservoir.peek());
        Assert.assertNull(abstractReservoir.sweep());
        Assert.assertEquals(1L, abstractReservoir.getCount(false));
        Assert.assertEquals(1L, abstractReservoir.getSink().getCount(false));
        Assert.assertTrue(abstractReservoir.isEmpty());
        Assert.assertEquals(0L, abstractReservoir.size());
        Assert.assertEquals(0L, abstractReservoir.size(false));
        this.exception.expect(cls);
        abstractReservoir.remove();
    }

    @Test
    @Parameters(method = "defaultTestParameters")
    public void testAddAndSweepTuple(AbstractReservoir abstractReservoir, Class<? extends Throwable> cls) {
        Tuple tuple = new Tuple(MessageType.BEGIN_WINDOW, 0L);
        Assert.assertTrue(abstractReservoir.add(tuple));
        Assert.assertFalse(abstractReservoir.isEmpty());
        Assert.assertEquals(1L, abstractReservoir.size());
        Assert.assertEquals(1L, abstractReservoir.size(false));
        Assert.assertEquals(tuple, abstractReservoir.peek());
        Assert.assertEquals(tuple, abstractReservoir.sweep());
        Assert.assertEquals(tuple, abstractReservoir.sweep());
        Assert.assertEquals(0L, abstractReservoir.getCount(false));
        Assert.assertEquals(0L, abstractReservoir.getSink().getCount(false));
        Assert.assertFalse(abstractReservoir.isEmpty());
        Assert.assertEquals(tuple, abstractReservoir.remove());
        Assert.assertNull(abstractReservoir.peek());
        Assert.assertNull(abstractReservoir.poll());
        Assert.assertNull(abstractReservoir.sweep());
        this.exception.expect(cls);
        abstractReservoir.remove();
    }

    @Test
    @Parameters(method = "defaultTestParameters")
    public void testFullReservoir(AbstractReservoir abstractReservoir, Class<? extends Throwable> cls) {
        int remainingCapacity = abstractReservoir.remainingCapacity();
        Assert.assertTrue(remainingCapacity > 0);
        Integer num = new Integer(0);
        for (int i = 0; i < remainingCapacity; i++) {
            Assert.assertTrue(abstractReservoir.offer(num));
        }
        Assert.assertFalse(abstractReservoir.offer(num));
        this.exception.expect(IllegalStateException.class);
        abstractReservoir.add(num);
    }

    @Test
    @Parameters(method = "performanceTestParameters")
    public void performanceTest(final AbstractReservoir abstractReservoir, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        final int intValue = ((Integer) Context.PortContext.SPIN_MILLIS.defaultValue).intValue();
        Thread thread = new Thread(new Runnable() { // from class: com.datatorrent.stram.engine.AbstractReservoirTest.3
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        int i = 0;
                        abstractReservoir.sweep();
                        while (abstractReservoir.isEmpty()) {
                            int i2 = i;
                            i++;
                            Thread.sleep(Math.min(intValue, i2));
                        }
                    } catch (InterruptedException e) {
                        AbstractReservoirTest.logger.error("Interrupted", e);
                        throw new RuntimeException(e);
                    } catch (RuntimeException e2) {
                        Assert.assertEquals(AbstractReservoirTest.COUNT, abstractReservoir.getSink().getCount(false));
                        return;
                    }
                }
            }
        });
        thread.start();
        Byte[] bArr = new Byte[128];
        for (int i = 0; i < COUNT; i++) {
            try {
                abstractReservoir.put(bArr);
            } catch (InterruptedException e) {
                logger.error("Interrupted", e);
                throw new RuntimeException(e);
            }
        }
        thread.join();
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        logger.debug("{}: time {}", abstractReservoir.getId(), Long.valueOf(currentTimeMillis2));
        Assert.assertTrue(abstractReservoir.getId() + ": expected to complete within " + j + " millis. Actual time " + currentTimeMillis2 + " millis", j > currentTimeMillis2);
    }

    @Test
    @Ignore
    public void testBlockingQueuePerformance() {
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(CAPACITY);
        long currentTimeMillis = System.currentTimeMillis();
        Thread thread = new Thread(new Runnable() { // from class: com.datatorrent.stram.engine.AbstractReservoirTest.4
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < AbstractReservoirTest.COUNT; i++) {
                    try {
                        arrayBlockingQueue.take();
                    } catch (InterruptedException e) {
                        AbstractReservoirTest.logger.error("Interrupted", e);
                        throw new RuntimeException(e);
                    }
                }
            }
        });
        thread.start();
        Byte[] bArr = new Byte[128];
        for (int i = 0; i < COUNT; i++) {
            try {
                arrayBlockingQueue.put(bArr);
            } catch (InterruptedException e) {
                logger.error("Interrupted", e);
                throw new RuntimeException(e);
            }
        }
        thread.join();
        logger.debug("Time {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    @Test
    @Ignore
    public void testSpscQueuePerformance() {
        final SpscArrayQueue spscArrayQueue = new SpscArrayQueue(CAPACITY);
        long currentTimeMillis = System.currentTimeMillis();
        new Thread(new Runnable() { // from class: com.datatorrent.stram.engine.AbstractReservoirTest.5
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < AbstractReservoirTest.COUNT; i++) {
                    try {
                        int i2 = 0;
                        while (spscArrayQueue.poll() == null) {
                            Thread.sleep(i2);
                            i2 = Math.min(10, i2 + 1);
                        }
                    } catch (InterruptedException e) {
                        AbstractReservoirTest.logger.error("Interrupted", e);
                        throw new RuntimeException(e);
                    }
                }
            }
        }).start();
        Byte[] bArr = new Byte[128];
        for (int i = 0; i < COUNT; i++) {
            try {
                int i2 = 0;
                while (!spscArrayQueue.offer(bArr)) {
                    Thread.sleep(i2);
                    i2 = Math.min(10, i2 + 1);
                }
            } catch (InterruptedException e) {
                logger.error("Interrupted", e);
                throw new RuntimeException(e);
            }
        }
        logger.debug("Time {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    @Test
    @Ignore
    public void testSpscBlockingQueuePerformance() {
        final SpscArrayQueue spscArrayQueue = new SpscArrayQueue(CAPACITY);
        final ReentrantLock reentrantLock = new ReentrantLock();
        final Condition newCondition = reentrantLock.newCondition();
        long currentTimeMillis = System.currentTimeMillis();
        Thread thread = new Thread(new Runnable() { // from class: com.datatorrent.stram.engine.AbstractReservoirTest.6
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < AbstractReservoirTest.COUNT; i++) {
                    try {
                        int i2 = 0;
                        while (spscArrayQueue.poll() == null) {
                            Thread.sleep(i2);
                            i2 = Math.min(10, i2 + 1);
                        }
                        reentrantLock.lock();
                        newCondition.signal();
                        reentrantLock.unlock();
                    } catch (InterruptedException e) {
                        AbstractReservoirTest.logger.error("Interrupted", e);
                        throw new RuntimeException(e);
                    }
                }
            }
        });
        thread.start();
        Byte[] bArr = new Byte[128];
        for (int i = 0; i < COUNT; i++) {
            try {
                if (!spscArrayQueue.offer(bArr)) {
                    reentrantLock.lockInterruptibly();
                    while (!spscArrayQueue.offer(bArr)) {
                        newCondition.await();
                    }
                    reentrantLock.unlock();
                }
            } catch (InterruptedException e) {
                logger.error("Interrupted", e);
                throw new RuntimeException(e);
            }
        }
        thread.join();
        logger.debug("Time {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    @Test
    @Ignore
    public void testCircularBufferPerformance() {
        final CircularBuffer circularBuffer = new CircularBuffer(CAPACITY);
        long currentTimeMillis = System.currentTimeMillis();
        Thread thread = new Thread(new Runnable() { // from class: com.datatorrent.stram.engine.AbstractReservoirTest.7
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < AbstractReservoirTest.COUNT; i++) {
                    try {
                        int i2 = 0;
                        while (circularBuffer.poll() == null) {
                            Thread.sleep(i2);
                            i2 = Math.min(10, i2 + 1);
                        }
                    } catch (InterruptedException e) {
                        AbstractReservoirTest.logger.error("Interrupted", e);
                        throw new RuntimeException(e);
                    }
                }
            }
        });
        thread.start();
        Byte[] bArr = new Byte[128];
        for (int i = 0; i < COUNT; i++) {
            try {
                circularBuffer.put(bArr);
            } catch (InterruptedException e) {
                logger.error("Interrupted", e);
                throw new RuntimeException(e);
            }
        }
        thread.join();
        logger.debug("Time {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }
}
