package org.apache.flink.streaming.runtime.io;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest.class */
public class CheckpointBarrierAlignerAlignmentLimitTest {
    private static final int PAGE_SIZE = 512;
    private static final Random RND = new Random();
    private static IOManager ioManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest$AssertCheckpointWasAborted.class */
    public static class AssertCheckpointWasAborted implements Closeable {
        private final CheckpointNotificationVerifier checkpointNotificationVerifier;

        public AssertCheckpointWasAborted(CheckpointNotificationVerifier checkpointNotificationVerifier) {
            this.checkpointNotificationVerifier = checkpointNotificationVerifier;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            Assert.assertEquals(String.format("AbstractInvokable::abortCheckpointOnBarrier(%d) has not fired", Long.valueOf(this.checkpointNotificationVerifier.expectedAbortCheckpointId)), -1L, this.checkpointNotificationVerifier.expectedAbortCheckpointId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerAlignmentLimitTest$CheckpointNotificationVerifier.class */
    public static class CheckpointNotificationVerifier extends AbstractInvokable {
        private final List<Long> triggeredCheckpoints;
        private long expectedAbortCheckpointId;

        @Nullable
        private CheckpointFailureReason expectedCheckpointFailureReason;

        public CheckpointNotificationVerifier() {
            super(new MockEnvironmentBuilder().build());
            this.triggeredCheckpoints = new ArrayList();
            this.expectedAbortCheckpointId = -1L;
        }

        public void invoke() throws Exception {
        }

        public void abortCheckpointOnBarrier(long j, Throwable th) throws Exception {
            try {
                if (this.expectedAbortCheckpointId == j && matchesExpectedCause(th)) {
                } else {
                    throw new Exception(th);
                }
            } finally {
                this.expectedAbortCheckpointId = -1L;
                this.expectedCheckpointFailureReason = null;
            }
        }

        private boolean matchesExpectedCause(Throwable th) {
            if (this.expectedCheckpointFailureReason == null) {
                return true;
            }
            Optional findThrowable = ExceptionUtils.findThrowable(th, CheckpointException.class);
            return findThrowable.isPresent() && ((CheckpointException) findThrowable.get()).getCheckpointFailureReason() == this.expectedCheckpointFailureReason;
        }

        public AssertCheckpointWasAborted expectAbortCheckpoint(long j, CheckpointFailureReason checkpointFailureReason) {
            this.expectedAbortCheckpointId = j;
            this.expectedCheckpointFailureReason = checkpointFailureReason;
            return new AssertCheckpointWasAborted(this);
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
            this.triggeredCheckpoints.add(Long.valueOf(checkpointMetaData.getCheckpointId()));
        }

        public List<Long> getAndResetTriggeredCheckpoints() {
            ArrayList arrayList = new ArrayList(this.triggeredCheckpoints);
            this.triggeredCheckpoints.clear();
            return arrayList;
        }
    }

    @BeforeClass
    public static void setup() {
        ioManager = new IOManagerAsync();
    }

    @AfterClass
    public static void shutdownIOManager() throws Exception {
        ioManager.close();
    }

    @Test
    public void testBreakCheckpointAtAlignmentLimit() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(1, 100), createBuffer(2, 70), createBuffer(0, 42), createBuffer(2, 111), createBarrier(7L, 1), createBuffer(1, 100), createBuffer(2, 200), createBuffer(1, 300), createBuffer(0, 50), createBarrier(7L, 0), createBuffer(2, 100), createBuffer(0, 100), createBuffer(1, 200), createBuffer(0, 200), createBuffer(0, 101), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createBarrier(7L, 2), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)};
        MockInputGate mockInputGate = new MockInputGate(3, Arrays.asList(bufferOrEventArr));
        CheckpointNotificationVerifier checkpointNotificationVerifier = new CheckpointNotificationVerifier();
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(mockInputGate, new CachedBufferStorage(PAGE_SIZE, 1024L, "Testing"), "Testing", checkpointNotificationVerifier);
        check(bufferOrEventArr[0], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        check(bufferOrEventArr[1], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        check(bufferOrEventArr[2], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        check(bufferOrEventArr[3], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        long nanoTime = System.nanoTime();
        check(bufferOrEventArr[6], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        check(bufferOrEventArr[8], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        check(bufferOrEventArr[10], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        AssertCheckpointWasAborted expectAbortCheckpoint = checkpointNotificationVerifier.expectAbortCheckpoint(7L, CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED);
        Throwable th = null;
        try {
            try {
                check(bufferOrEventArr[5], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                validateAlignmentTime(nanoTime, checkpointedInputGate);
                if (expectAbortCheckpoint != null) {
                    if (0 != 0) {
                        try {
                            expectAbortCheckpoint.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        expectAbortCheckpoint.close();
                    }
                }
                check(bufferOrEventArr[7], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[11], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[12], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[13], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[14], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[15], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[16], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[17], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[19], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[20], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[21], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                Assert.assertTrue(checkpointNotificationVerifier.getAndResetTriggeredCheckpoints().isEmpty());
                Assert.assertFalse(checkpointedInputGate.pollNext().isPresent());
                Assert.assertTrue(checkpointedInputGate.isFinished());
                checkpointedInputGate.cleanup();
                checkNoTempFilesRemain();
            } finally {
            }
        } catch (Throwable th3) {
            if (expectAbortCheckpoint != null) {
                if (th != null) {
                    try {
                        expectAbortCheckpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    expectAbortCheckpoint.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAlignmentLimitWithQueuedAlignments() throws Exception {
        BufferOrEvent[] bufferOrEventArr = {createBuffer(1, 100), createBuffer(2, 70), createBarrier(3L, 2), createBuffer(1, 100), createBuffer(2, 100), createBarrier(3L, 0), createBuffer(0, 100), createBuffer(1, 100), createBarrier(4L, 0), createBuffer(0, 100), createBuffer(0, 120), createBuffer(1, 100), createBuffer(2, 100), createBarrier(3L, 1), createBarrier(4L, 1), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100), createBarrier(4L, 2), createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)};
        MockInputGate mockInputGate = new MockInputGate(3, Arrays.asList(bufferOrEventArr));
        CheckpointNotificationVerifier checkpointNotificationVerifier = new CheckpointNotificationVerifier();
        CheckpointedInputGate checkpointedInputGate = new CheckpointedInputGate(mockInputGate, new CachedBufferStorage(PAGE_SIZE, 2560L, "Testing"), "Testing", checkpointNotificationVerifier);
        check(bufferOrEventArr[0], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        check(bufferOrEventArr[1], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        long nanoTime = System.nanoTime();
        check(bufferOrEventArr[3], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        check(bufferOrEventArr[7], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        check(bufferOrEventArr[11], (BufferOrEvent) checkpointedInputGate.pollNext().get());
        AssertCheckpointWasAborted expectAbortCheckpoint = checkpointNotificationVerifier.expectAbortCheckpoint(3L, CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED);
        Throwable th = null;
        try {
            try {
                check(bufferOrEventArr[4], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                validateAlignmentTime(nanoTime, checkpointedInputGate);
                if (expectAbortCheckpoint != null) {
                    if (0 != 0) {
                        try {
                            expectAbortCheckpoint.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        expectAbortCheckpoint.close();
                    }
                }
                check(bufferOrEventArr[6], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                long nanoTime2 = System.nanoTime();
                check(bufferOrEventArr[12], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[17], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[9], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                validateAlignmentTime(nanoTime2, checkpointedInputGate);
                MatcherAssert.assertThat(checkpointNotificationVerifier.getAndResetTriggeredCheckpoints(), IsIterableContainingInOrder.contains(new Long[]{4L}));
                check(bufferOrEventArr[10], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[15], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[16], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[19], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[20], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                check(bufferOrEventArr[21], (BufferOrEvent) checkpointedInputGate.pollNext().get());
                Assert.assertTrue(checkpointNotificationVerifier.getAndResetTriggeredCheckpoints().isEmpty());
                Assert.assertFalse(checkpointedInputGate.pollNext().isPresent());
                Assert.assertTrue(checkpointedInputGate.isFinished());
                checkpointedInputGate.cleanup();
                checkNoTempFilesRemain();
            } finally {
            }
        } catch (Throwable th3) {
            if (expectAbortCheckpoint != null) {
                if (th != null) {
                    try {
                        expectAbortCheckpoint.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    expectAbortCheckpoint.close();
                }
            }
            throw th3;
        }
    }

    private static BufferOrEvent createBuffer(int i, int i2) {
        byte[] bArr = new byte[i2];
        RND.nextBytes(bArr);
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
        allocateUnpooledSegment.put(0, bArr);
        NetworkBuffer networkBuffer = new NetworkBuffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE);
        networkBuffer.setSize(i2);
        networkBuffer.retainBuffer();
        return new BufferOrEvent(networkBuffer, i);
    }

    private static BufferOrEvent createBarrier(long j, int i) {
        return new BufferOrEvent(new CheckpointBarrier(j, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), i);
    }

    private static void check(BufferOrEvent bufferOrEvent, BufferOrEvent bufferOrEvent2) {
        Assert.assertNotNull(bufferOrEvent);
        Assert.assertNotNull(bufferOrEvent2);
        Assert.assertEquals(Boolean.valueOf(bufferOrEvent.isBuffer()), Boolean.valueOf(bufferOrEvent2.isBuffer()));
        if (!bufferOrEvent.isBuffer()) {
            Assert.assertEquals(bufferOrEvent.getEvent(), bufferOrEvent2.getEvent());
            return;
        }
        Assert.assertEquals(bufferOrEvent.getBuffer().getMaxCapacity(), bufferOrEvent2.getBuffer().getMaxCapacity());
        Assert.assertEquals(bufferOrEvent.getBuffer().getSize(), bufferOrEvent2.getBuffer().getSize());
        Assert.assertTrue("memory contents differs", bufferOrEvent.getBuffer().getMemorySegment().compare(bufferOrEvent2.getBuffer().getMemorySegment(), 0, 0, PAGE_SIZE) == 0);
    }

    private static void validateAlignmentTime(long j, CheckpointedInputGate checkpointedInputGate) {
        Assert.assertTrue("wrong alignment time", checkpointedInputGate.getAlignmentDurationNanos() <= System.nanoTime() - j);
    }

    private static void checkNoTempFilesRemain() {
        for (File file : ioManager.getSpillingDirectories()) {
            for (String str : file.list()) {
                if (str != null && !str.equals(".") && !str.equals("..")) {
                    Assert.fail("barrier buffer did not clean up temp files. remaining file: " + str);
                }
            }
        }
    }
}
