/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Random;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateListener;
import org.apache.flink.streaming.runtime.io.BarrierBuffer;
import org.junit.Assert;
import org.junit.Test;

public class BarrierBufferMassiveRandomTest {
    private static final int PAGE_SIZE = 1024;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithTwoChannelsAndRandomBarriers() {
        IOManagerAsync ioMan = null;
        try {
            ioMan = new IOManagerAsync();
            BufferPool pool1 = new NetworkBufferPool(100, 1024, MemoryType.HEAP).createBufferPool(100, true);
            BufferPool pool2 = new NetworkBufferPool(100, 1024, MemoryType.HEAP).createBufferPool(100, true);
            RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(new BufferPool[]{pool1, pool2}, new BarrierGenerator[]{new CountBarrier(100000L), new RandomBarrier(100000.0)});
            BarrierBuffer barrierBuffer = new BarrierBuffer((InputGate)myIG, (IOManager)ioMan);
            for (int i = 0; i < 2000000; ++i) {
                BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
                if (!boe.isBuffer()) continue;
                boe.getBuffer().recycle();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (ioMan != null) {
                ioMan.shutdown();
            }
        }
    }

    protected static class RandomGeneratingInputGate
    implements InputGate {
        private final int numChannels;
        private final BufferPool[] bufferPools;
        private final int[] currentBarriers;
        private final BarrierGenerator[] barrierGens;
        private int currentChannel = 0;
        private long c = 0L;

        public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
            this.numChannels = bufferPools.length;
            this.currentBarriers = new int[this.numChannels];
            this.bufferPools = bufferPools;
            this.barrierGens = barrierGens;
        }

        public int getNumberOfInputChannels() {
            return this.numChannels;
        }

        public boolean isFinished() {
            return false;
        }

        public void requestPartitions() {
        }

        public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
            this.currentChannel = (this.currentChannel + 1) % this.numChannels;
            if (this.barrierGens[this.currentChannel].isNextBarrier()) {
                int n = this.currentChannel;
                int n2 = this.currentBarriers[n] + 1;
                this.currentBarriers[n] = n2;
                return new BufferOrEvent((AbstractEvent)new CheckpointBarrier((long)n2, System.currentTimeMillis()), this.currentChannel);
            }
            Buffer buffer = this.bufferPools[this.currentChannel].requestBuffer();
            buffer.getMemorySegment().putLong(0, this.c++);
            return new BufferOrEvent(buffer, this.currentChannel);
        }

        public void sendTaskEvent(TaskEvent event) {
        }

        public void registerListener(InputGateListener listener) {
        }

        public int getPageSize() {
            return 1024;
        }
    }

    private static class CountBarrier
    implements BarrierGenerator {
        private final long every;
        private long c = 0L;

        public CountBarrier(long every) {
            this.every = every;
        }

        @Override
        public boolean isNextBarrier() {
            return this.c++ % this.every == 0L;
        }
    }

    protected static class RandomBarrier
    implements BarrierGenerator {
        private static final Random rnd = new Random();
        private final double threshold;

        public RandomBarrier(double expectedEvery) {
            this.threshold = 1.0 / expectedEvery;
        }

        @Override
        public boolean isNextBarrier() {
            return rnd.nextDouble() < this.threshold;
        }
    }

    protected static interface BarrierGenerator {
        public boolean isNextBarrier();
    }
}

