/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.util.Arrays;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.runtime.io.BufferSpiller;
import org.apache.flink.streaming.runtime.io.BufferStorage;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierAlignerTestBase;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.hamcrest.MockitoHamcrest;
import org.mockito.verification.VerificationMode;

public class CheckpointBarrierAlignerAlignmentLimitTest {
    private static final int PAGE_SIZE = 512;
    private static final Random RND = new Random();
    private static IOManager ioManager;

    @BeforeClass
    public static void setup() {
        ioManager = new IOManagerAsync();
    }

    @AfterClass
    public static void shutdownIOManager() throws Exception {
        ioManager.close();
    }

    @Test
    public void testBreakCheckpointAtAlignmentLimit() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 70), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 42), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 111), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(7L, 1), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 200), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 300), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 50), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(7L, 0), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 200), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 200), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 101), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(7L, 2), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 100)};
        MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
        AbstractInvokable toNotify = (AbstractInvokable)Mockito.mock(AbstractInvokable.class);
        CheckpointedInputGate buffer = new CheckpointedInputGate((InputGate)gate, (BufferStorage)new BufferSpiller(ioManager, 512, 1000L), "Testing", toNotify);
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[0], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[1], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[2], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[3], (BufferOrEvent)buffer.pollNext().get());
        long startTs = System.nanoTime();
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[6], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[8], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[10], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[5], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.validateAlignmentTime(startTs, buffer);
        ((AbstractInvokable)Mockito.verify((Object)toNotify, (VerificationMode)Mockito.times((int)1))).abortCheckpointOnBarrier(Mockito.eq((long)7L), (Throwable)MockitoHamcrest.argThat((Matcher)new CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[7], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[11], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[12], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[13], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[14], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[15], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[16], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[17], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[19], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[20], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[21], (BufferOrEvent)buffer.pollNext().get());
        ((AbstractInvokable)Mockito.verify((Object)toNotify, (VerificationMode)Mockito.times((int)0))).triggerCheckpointOnBarrier((CheckpointMetaData)Mockito.any(CheckpointMetaData.class), (CheckpointOptions)Mockito.any(CheckpointOptions.class), (CheckpointMetrics)Mockito.any(CheckpointMetrics.class));
        Assert.assertFalse((boolean)buffer.pollNext().isPresent());
        Assert.assertTrue((boolean)buffer.isFinished());
        buffer.cleanup();
        CheckpointBarrierAlignerAlignmentLimitTest.checkNoTempFilesRemain();
    }

    @Test
    public void testAlignmentLimitWithQueuedAlignments() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 70), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(3L, 2), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(3L, 0), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(4L, 0), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 120), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(3L, 1), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(4L, 1), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBarrier(4L, 2), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(0, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(1, 100), CheckpointBarrierAlignerAlignmentLimitTest.createBuffer(2, 100)};
        MockInputGate gate = new MockInputGate(3, Arrays.asList(sequence));
        AbstractInvokable toNotify = (AbstractInvokable)Mockito.mock(AbstractInvokable.class);
        CheckpointedInputGate buffer = new CheckpointedInputGate((InputGate)gate, (BufferStorage)new BufferSpiller(ioManager, 512, 500L), "Testing", toNotify);
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[0], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[1], (BufferOrEvent)buffer.pollNext().get());
        long startTs = System.nanoTime();
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[3], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[7], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[11], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[4], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.validateAlignmentTime(startTs, buffer);
        ((AbstractInvokable)Mockito.verify((Object)toNotify, (VerificationMode)Mockito.times((int)1))).abortCheckpointOnBarrier(Mockito.eq((long)3L), (Throwable)MockitoHamcrest.argThat((Matcher)new CheckpointBarrierAlignerTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)));
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[6], (BufferOrEvent)buffer.pollNext().get());
        startTs = System.nanoTime();
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[12], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[17], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[9], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.validateAlignmentTime(startTs, buffer);
        ((AbstractInvokable)Mockito.verify((Object)toNotify, (VerificationMode)Mockito.times((int)1))).triggerCheckpointOnBarrier((CheckpointMetaData)MockitoHamcrest.argThat((Matcher)new CheckpointMatcher(4L)), (CheckpointOptions)Mockito.any(CheckpointOptions.class), (CheckpointMetrics)Mockito.any(CheckpointMetrics.class));
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[10], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[15], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[16], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[19], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[20], (BufferOrEvent)buffer.pollNext().get());
        CheckpointBarrierAlignerAlignmentLimitTest.check(sequence[21], (BufferOrEvent)buffer.pollNext().get());
        ((AbstractInvokable)Mockito.verify((Object)toNotify, (VerificationMode)Mockito.times((int)0))).triggerCheckpointOnBarrier((CheckpointMetaData)MockitoHamcrest.argThat((Matcher)new CheckpointMatcher(3L)), (CheckpointOptions)Mockito.any(CheckpointOptions.class), (CheckpointMetrics)Mockito.any(CheckpointMetrics.class));
        Assert.assertFalse((boolean)buffer.pollNext().isPresent());
        Assert.assertTrue((boolean)buffer.isFinished());
        buffer.cleanup();
        CheckpointBarrierAlignerAlignmentLimitTest.checkNoTempFilesRemain();
    }

    private static BufferOrEvent createBuffer(int channel, int size) {
        byte[] bytes = new byte[size];
        RND.nextBytes(bytes);
        MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment((int)512);
        memory.put(0, bytes);
        NetworkBuffer buf = new NetworkBuffer(memory, FreeingBufferRecycler.INSTANCE);
        buf.setSize(size);
        buf.retainBuffer();
        return new BufferOrEvent((Buffer)buf, channel);
    }

    private static BufferOrEvent createBarrier(long id, int channel) {
        return new BufferOrEvent((AbstractEvent)new CheckpointBarrier(id, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), channel);
    }

    private static void check(BufferOrEvent expected, BufferOrEvent present) {
        Assert.assertNotNull((Object)expected);
        Assert.assertNotNull((Object)present);
        Assert.assertEquals((Object)expected.isBuffer(), (Object)present.isBuffer());
        if (expected.isBuffer()) {
            Assert.assertEquals((long)expected.getBuffer().getMaxCapacity(), (long)present.getBuffer().getMaxCapacity());
            Assert.assertEquals((long)expected.getBuffer().getSize(), (long)present.getBuffer().getSize());
            MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
            MemorySegment presentMem = present.getBuffer().getMemorySegment();
            Assert.assertTrue((String)"memory contents differs", (expectedMem.compare(presentMem, 0, 0, 512) == 0 ? 1 : 0) != 0);
        } else {
            Assert.assertEquals((Object)expected.getEvent(), (Object)present.getEvent());
        }
    }

    private static void validateAlignmentTime(long startTimestamp, CheckpointedInputGate buffer) {
        long elapsed = System.nanoTime() - startTimestamp;
        Assert.assertTrue((String)"wrong alignment time", (buffer.getAlignmentDurationNanos() <= elapsed ? 1 : 0) != 0);
    }

    private static void checkNoTempFilesRemain() {
        for (File dir : ioManager.getSpillingDirectories()) {
            for (String file : dir.list()) {
                if (file == null || file.equals(".") || file.equals("..")) continue;
                Assert.fail((String)("barrier buffer did not clean up temp files. remaining file: " + file));
            }
        }
    }

    private static class CheckpointMatcher
    extends BaseMatcher<CheckpointMetaData> {
        private final long checkpointId;

        CheckpointMatcher(long checkpointId) {
            this.checkpointId = checkpointId;
        }

        public boolean matches(Object o) {
            return o != null && o.getClass() == CheckpointMetaData.class && ((CheckpointMetaData)o).getCheckpointId() == this.checkpointId;
        }

        public void describeTo(Description description) {
            description.appendText("CheckpointMetaData - id = " + this.checkpointId);
        }
    }
}

