/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.junit.Test;

public class CheckpointBarrierAlignerMassiveRandomTest {
    private static final int PAGE_SIZE = 1024;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWithTwoChannelsAndRandomBarriers() throws Exception {
        NetworkBufferPool networkBufferPool1 = null;
        NetworkBufferPool networkBufferPool2 = null;
        try {
            networkBufferPool1 = new NetworkBufferPool(100, 1024, 1);
            networkBufferPool2 = new NetworkBufferPool(100, 1024, 1);
            BufferPool pool1 = networkBufferPool1.createBufferPool(100, 100);
            BufferPool pool2 = networkBufferPool2.createBufferPool(100, 100);
            RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(new BufferPool[]{pool1, pool2}, new BarrierGenerator[]{new CountBarrier(100000L), new RandomBarrier(100000.0)});
            CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate((InputGate)myIG, (CheckpointBarrierHandler)new CheckpointBarrierAligner("Testing: No task associated", (AbstractInvokable)new DummyCheckpointInvokable(), new InputGate[]{myIG}));
            for (int i = 0; i < 2000000; ++i) {
                BufferOrEvent boe = (BufferOrEvent)checkpointedInputGate.pollNext().get();
                if (!boe.isBuffer()) continue;
                boe.getBuffer().recycleBuffer();
            }
        }
        finally {
            if (networkBufferPool1 != null) {
                networkBufferPool1.destroyAllBufferPools();
                networkBufferPool1.destroy();
            }
            if (networkBufferPool2 != null) {
                networkBufferPool2.destroyAllBufferPools();
                networkBufferPool2.destroy();
            }
        }
    }

    private static class RandomGeneratingInputGate
    extends InputGate {
        private final int numberOfChannels;
        private final BufferPool[] bufferPools;
        private final int[] currentBarriers;
        private final boolean[] channelBlocked;
        private final BarrierGenerator[] barrierGens;
        private int currentChannel = 0;
        private long c = 0L;

        public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
            this.numberOfChannels = bufferPools.length;
            this.currentBarriers = new int[this.numberOfChannels];
            this.channelBlocked = new boolean[this.numberOfChannels];
            this.bufferPools = bufferPools;
            this.barrierGens = barrierGens;
            this.availabilityHelper.resetAvailable();
        }

        public int getNumberOfInputChannels() {
            return this.numberOfChannels;
        }

        public boolean isFinished() {
            return false;
        }

        public InputChannel getChannel(int channelIndex) {
            throw new UnsupportedOperationException();
        }

        public List<InputChannelInfo> getChannelInfos() {
            return IntStream.range(0, this.numberOfChannels).mapToObj(channelIndex -> new InputChannelInfo(0, channelIndex)).collect(Collectors.toList());
        }

        public Optional<BufferOrEvent> getNext() throws IOException {
            this.currentChannel = (this.currentChannel + 1) % this.numberOfChannels;
            if (this.channelBlocked[this.currentChannel]) {
                return this.getNext();
            }
            if (this.barrierGens[this.currentChannel].isNextBarrier()) {
                this.channelBlocked[this.currentChannel] = true;
                if (this.allChannelsBlocked()) {
                    Arrays.fill(this.channelBlocked, false);
                }
                int n = this.currentChannel;
                int n2 = this.currentBarriers[n] + 1;
                this.currentBarriers[n] = n2;
                return Optional.of(new BufferOrEvent((AbstractEvent)new CheckpointBarrier((long)n2, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), new InputChannelInfo(0, this.currentChannel)));
            }
            Buffer buffer = this.bufferPools[this.currentChannel].requestBuffer();
            if (buffer == null) {
                return this.getNext();
            }
            buffer.getMemorySegment().putLong(0, this.c++);
            return Optional.of(new BufferOrEvent(buffer, new InputChannelInfo(0, this.currentChannel)));
        }

        public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
            return this.getNext();
        }

        private boolean allChannelsBlocked() {
            for (boolean blocked : this.channelBlocked) {
                if (blocked) continue;
                return false;
            }
            return true;
        }

        public void sendTaskEvent(TaskEvent event) {
        }

        public void resumeConsumption(int channelIndex) {
        }

        public void setup() {
        }

        public CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) {
            return CompletableFuture.completedFuture(null);
        }

        public void requestPartitions() {
        }

        public void close() {
        }

        public void registerBufferReceivedListener(BufferReceivedListener listener) {
        }
    }

    private static class CountBarrier
    implements BarrierGenerator {
        private final long every;
        private long c = 0L;

        public CountBarrier(long every) {
            this.every = every;
        }

        @Override
        public boolean isNextBarrier() {
            return this.c++ % this.every == 0L;
        }
    }

    private static class RandomBarrier
    implements BarrierGenerator {
        private static final Random rnd = new Random();
        private final double threshold;

        public RandomBarrier(double expectedEvery) {
            this.threshold = 1.0 / expectedEvery;
        }

        @Override
        public boolean isNextBarrier() {
            return rnd.nextDouble() < this.threshold;
        }
    }

    private static interface BarrierGenerator {
        public boolean isNextBarrier();
    }
}

