/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Optional;
import java.util.Random;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.runtime.io.BufferSpiller;
import org.apache.flink.streaming.runtime.io.BufferStorage;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.junit.Assert;
import org.junit.Test;

public class CheckpointBarrierAlignerMassiveRandomTest {
    private static final int PAGE_SIZE = 1024;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithTwoChannelsAndRandomBarriers() {
        NetworkBufferPool networkBufferPool1 = null;
        NetworkBufferPool networkBufferPool2 = null;
        try (IOManagerAsync ioMan = new IOManagerAsync();){
            networkBufferPool1 = new NetworkBufferPool(100, 1024, 1);
            networkBufferPool2 = new NetworkBufferPool(100, 1024, 1);
            BufferPool pool1 = networkBufferPool1.createBufferPool(100, 100);
            BufferPool pool2 = networkBufferPool2.createBufferPool(100, 100);
            RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(new BufferPool[]{pool1, pool2}, new BarrierGenerator[]{new CountBarrier(100000L), new RandomBarrier(100000.0)});
            CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate((InputGate)myIG, (BufferStorage)new BufferSpiller((IOManager)ioMan, 1024), "Testing: No task associated", null);
            for (int i = 0; i < 2000000; ++i) {
                BufferOrEvent boe = (BufferOrEvent)checkpointedInputGate.pollNext().get();
                if (!boe.isBuffer()) continue;
                boe.getBuffer().recycleBuffer();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (networkBufferPool1 != null) {
                networkBufferPool1.destroyAllBufferPools();
                networkBufferPool1.destroy();
            }
            if (networkBufferPool2 != null) {
                networkBufferPool2.destroyAllBufferPools();
                networkBufferPool2.destroy();
            }
        }
    }

    private static class RandomGeneratingInputGate
    extends InputGate {
        private final int numberOfChannels;
        private final BufferPool[] bufferPools;
        private final int[] currentBarriers;
        private final BarrierGenerator[] barrierGens;
        private int currentChannel = 0;
        private long c = 0L;

        public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
            this.numberOfChannels = bufferPools.length;
            this.currentBarriers = new int[this.numberOfChannels];
            this.bufferPools = bufferPools;
            this.barrierGens = barrierGens;
            this.isAvailable = AVAILABLE;
        }

        public int getNumberOfInputChannels() {
            return this.numberOfChannels;
        }

        public boolean isFinished() {
            return false;
        }

        public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
            this.currentChannel = (this.currentChannel + 1) % this.numberOfChannels;
            if (this.barrierGens[this.currentChannel].isNextBarrier()) {
                int n = this.currentChannel;
                int n2 = this.currentBarriers[n] + 1;
                this.currentBarriers[n] = n2;
                return Optional.of(new BufferOrEvent((AbstractEvent)new CheckpointBarrier((long)n2, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), this.currentChannel));
            }
            Buffer buffer = this.bufferPools[this.currentChannel].requestBuffer();
            buffer.getMemorySegment().putLong(0, this.c++);
            return Optional.of(new BufferOrEvent(buffer, this.currentChannel));
        }

        public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
            return this.getNext();
        }

        public void sendTaskEvent(TaskEvent event) {
        }

        public void setup() {
        }

        public void close() {
        }
    }

    private static class CountBarrier
    implements BarrierGenerator {
        private final long every;
        private long c = 0L;

        public CountBarrier(long every) {
            this.every = every;
        }

        @Override
        public boolean isNextBarrier() {
            return this.c++ % this.every == 0L;
        }
    }

    private static class RandomBarrier
    implements BarrierGenerator {
        private static final Random rnd = new Random();
        private final double threshold;

        public RandomBarrier(double expectedEvery) {
            this.threshold = 1.0 / expectedEvery;
        }

        @Override
        public boolean isNextBarrier() {
            return rnd.nextDouble() < this.threshold;
        }
    }

    private static interface BarrierGenerator {
        public boolean isNextBarrier();
    }
}

