package org.apache.flink.runtime.operators.coordination;

import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.TestEventSender;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.class */
public class OperatorCoordinatorHolderTest extends TestLogger {
    private final Consumer<Throwable> globalFailureHandler = th -> {
        this.globalFailure = th;
    };
    private Throwable globalFailure;

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest$CheckpointEventOrderTestBaseCoordinator.class */
    private static abstract class CheckpointEventOrderTestBaseCoordinator implements OperatorCoordinator, Runnable {
        private final Thread coordinatorThread = new Thread(this);
        protected final OperatorCoordinator.Context context;
        private volatile boolean closed;

        CheckpointEventOrderTestBaseCoordinator(OperatorCoordinator.Context context) {
            this.context = context;
        }

        public void start() throws Exception {
            this.coordinatorThread.start();
        }

        public void close() throws Exception {
            this.closed = true;
            this.coordinatorThread.interrupt();
            this.coordinatorThread.join();
        }

        public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
        }

        public void subtaskFailed(int i, @Nullable Throwable th) {
        }

        public abstract void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception;

        public void checkpointComplete(long j) {
        }

        public void resetToCheckpoint(byte[] bArr) throws Exception {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.closed) {
                try {
                    step();
                } catch (Throwable th) {
                    if (this.closed) {
                        return;
                    }
                    th.printStackTrace();
                    System.exit(-1);
                    return;
                }
            }
        }

        protected abstract void step() throws Exception;
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest$FutureCompletedAfterSendingEventsCoordinator.class */
    private static final class FutureCompletedAfterSendingEventsCoordinator extends CheckpointEventOrderTestBaseCoordinator {

        @Nullable
        private CompletableFuture<byte[]> checkpoint;
        private int num;

        FutureCompletedAfterSendingEventsCoordinator(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
            this.checkpoint = completableFuture;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        protected void step() throws Exception {
            Thread.sleep(2L);
            OperatorCoordinator.Context context = this.context;
            int i = this.num;
            this.num = i + 1;
            context.sendEvent(new TestOperatorEvent(i), 0);
            OperatorCoordinator.Context context2 = this.context;
            int i2 = this.num;
            this.num = i2 + 1;
            context2.sendEvent(new TestOperatorEvent(i2), 1);
            OperatorCoordinator.Context context3 = this.context;
            int i3 = this.num;
            this.num = i3 + 1;
            context3.sendEvent(new TestOperatorEvent(i3), 2);
            if (this.checkpoint != null) {
                this.checkpoint.complete(OperatorCoordinatorHolderTest.intToBytes(this.num));
                this.checkpoint = null;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest$FutureCompletedInstantlyTestCoordinator.class */
    private static final class FutureCompletedInstantlyTestCoordinator extends CheckpointEventOrderTestBaseCoordinator {
        private final ReentrantLock lock;
        private final Condition condition;

        @GuardedBy("lock")
        @Nullable
        private CompletableFuture<byte[]> checkpoint;
        private int num;

        FutureCompletedInstantlyTestCoordinator(OperatorCoordinator.Context context) {
            super(context);
            this.lock = new ReentrantLock(true);
            this.condition = this.lock.newCondition();
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
            this.lock.lock();
            try {
                this.checkpoint = completableFuture;
                this.condition.await();
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.CheckpointEventOrderTestBaseCoordinator
        protected void step() throws Exception {
            this.lock.lock();
            try {
                if (this.checkpoint != null) {
                    this.checkpoint.complete(OperatorCoordinatorHolderTest.intToBytes(this.num));
                    this.checkpoint = null;
                }
                OperatorCoordinator.Context context = this.context;
                int i = this.num;
                this.num = i + 1;
                context.sendEvent(new TestOperatorEvent(i), 0);
                this.condition.signalAll();
                Thread.sleep(2L);
            } finally {
                this.lock.unlock();
            }
        }
    }

    @After
    public void checkNoGlobalFailure() throws Exception {
        if (this.globalFailure != null) {
            ExceptionUtils.rethrowException(this.globalFailure);
        }
    }

    @Test
    public void checkpointFutureInitiallyNotDone() throws Exception {
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(new TestEventSender(), TestingOperatorCoordinator::new);
        CompletableFuture completableFuture = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(1L, completableFuture);
        Assert.assertFalse(completableFuture.isDone());
    }

    @Test
    public void completedCheckpointFuture() throws Exception {
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(new TestEventSender(), TestingOperatorCoordinator::new);
        byte[] bArr = {11, 22, 33, 44};
        CompletableFuture completableFuture = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(9L, completableFuture);
        getCoordinator(createCoordinatorHolder).getLastTriggeredCheckpoint().complete(bArr);
        Assert.assertTrue(completableFuture.isDone());
        Assert.assertArrayEquals(bArr, (byte[]) completableFuture.get());
    }

    @Test
    public void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        createCoordinatorHolder.checkpointCoordinator(1L, new CompletableFuture());
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(1), 1);
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(new TestOperatorEvent(1), 1)}));
    }

    @Test
    public void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 10L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
        Assert.assertTrue(testEventSender.events.isEmpty());
    }

    @Test
    public void abortedCheckpointReleasesBlockedEvents() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 123L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
        createCoordinatorHolder.abortCurrentTriggering();
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(new TestOperatorEvent(1337), 0)}));
    }

    @Test
    public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1111L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(1337), 0);
        createCoordinatorHolder.afterSourceBarrierInjection(1111L);
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(new TestOperatorEvent(1337), 0)}));
    }

    @Test
    public void failedTasksDropsBlockedEvents() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1000L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(0), 0);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(1), 1);
        createCoordinatorHolder.subtaskFailed(1, (Throwable) null);
        createCoordinatorHolder.abortCurrentTriggering();
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(new TestOperatorEvent(0), 0)}));
    }

    @Test
    public void restoreOpensValveEvents() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1000L);
        createCoordinatorHolder.resetToCheckpoint(new byte[0]);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(999), 1);
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(new TestOperatorEvent(999), 1)}));
    }

    @Test
    public void restoreDropsBlockedEvents() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1000L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(0), 0);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(1), 1);
        createCoordinatorHolder.resetToCheckpoint(new byte[0]);
        Assert.assertTrue(testEventSender.events.isEmpty());
    }

    @Test
    public void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        createCoordinatorHolder.checkpointCoordinator(1000L, new CompletableFuture());
        CompletableFuture<byte[]> lastTriggeredCheckpoint = getCoordinator(createCoordinatorHolder).getLastTriggeredCheckpoint();
        createCoordinatorHolder.abortCurrentTriggering();
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 1010L);
        createCoordinatorHolder.afterSourceBarrierInjection(1010L);
        lastTriggeredCheckpoint.complete(new byte[0]);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(123), 0);
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(new TestOperatorEvent(123), 0)}));
    }

    @Test
    public void triggeringFailsIfOtherTriggeringInProgress() throws Exception {
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(new TestEventSender(), TestingOperatorCoordinator::new);
        createCoordinatorHolder.checkpointCoordinator(11L, new CompletableFuture());
        CompletableFuture completableFuture = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(12L, completableFuture);
        Assert.assertTrue(completableFuture.isCompletedExceptionally());
        Assert.assertNotNull(this.globalFailure);
        this.globalFailure = null;
    }

    @Test
    public void takeCheckpointAfterSuccessfulCheckpoint() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(0), 0);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 22L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(1), 0);
        createCoordinatorHolder.afterSourceBarrierInjection(22L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(2), 0);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 23L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(3), 0);
        createCoordinatorHolder.afterSourceBarrierInjection(23L);
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(new TestOperatorEvent(0), 0), new TestEventSender.EventWithSubtask(new TestOperatorEvent(1), 0), new TestEventSender.EventWithSubtask(new TestOperatorEvent(2), 0), new TestEventSender.EventWithSubtask(new TestOperatorEvent(3), 0)}));
    }

    @Test
    public void takeCheckpointAfterAbortedCheckpoint() throws Exception {
        TestEventSender testEventSender = new TestEventSender();
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(testEventSender, TestingOperatorCoordinator::new);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(0), 0);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 22L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(1), 0);
        createCoordinatorHolder.abortCurrentTriggering();
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(2), 0);
        triggerAndCompleteCheckpoint(createCoordinatorHolder, 23L);
        getCoordinator(createCoordinatorHolder).getContext().sendEvent(new TestOperatorEvent(3), 0);
        createCoordinatorHolder.afterSourceBarrierInjection(23L);
        Assert.assertThat(testEventSender.events, Matchers.contains(new TestEventSender.EventWithSubtask[]{new TestEventSender.EventWithSubtask(new TestOperatorEvent(0), 0), new TestEventSender.EventWithSubtask(new TestOperatorEvent(1), 0), new TestEventSender.EventWithSubtask(new TestOperatorEvent(2), 0), new TestEventSender.EventWithSubtask(new TestOperatorEvent(3), 0)}));
    }

    @Test
    public void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws Exception {
        checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new);
    }

    @Test
    public void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception {
        checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new);
    }

    private void checkpointEventValueAtomicity(Function<OperatorCoordinator.Context, OperatorCoordinator> function) throws Exception {
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        ComponentMainThreadExecutor componentMainThreadExecutorServiceAdapter = new ComponentMainThreadExecutorServiceAdapter((ScheduledExecutorService) manuallyTriggeredScheduledExecutorService, Thread.currentThread());
        OperatorCoordinatorHolder createCoordinatorHolder = createCoordinatorHolder(new TestEventSender(), function, componentMainThreadExecutorServiceAdapter);
        Thread.sleep(new Random().nextInt(10) + 20);
        manuallyTriggeredScheduledExecutorService.triggerAll();
        CompletableFuture completableFuture = new CompletableFuture();
        createCoordinatorHolder.checkpointCoordinator(0L, completableFuture);
        manuallyTriggeredScheduledExecutorService.triggerAll();
        Thread.sleep(new Random().nextInt(10) + 10);
        createCoordinatorHolder.close();
        manuallyTriggeredScheduledExecutorService.triggerAll();
        Assert.assertTrue(completableFuture.isDone());
        int bytesToInt = bytesToInt((byte[]) completableFuture.get());
        Assert.assertEquals(bytesToInt, r0.events.size());
        for (int i = 0; i < bytesToInt; i++) {
            Assert.assertEquals(i, ((TestOperatorEvent) r0.events.get(i).event).getValue());
        }
    }

    private CompletableFuture<byte[]> triggerAndCompleteCheckpoint(OperatorCoordinatorHolder operatorCoordinatorHolder, long j) throws Exception {
        CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
        operatorCoordinatorHolder.checkpointCoordinator(j, completableFuture);
        getCoordinator(operatorCoordinatorHolder).getLastTriggeredCheckpoint().complete(new byte[0]);
        return completableFuture;
    }

    static byte[] intToBytes(int i) {
        return ByteBuffer.allocate(4).putInt(i).array();
    }

    static int bytesToInt(byte[] bArr) {
        return ByteBuffer.wrap(bArr).getInt();
    }

    private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder operatorCoordinatorHolder) {
        return (TestingOperatorCoordinator) operatorCoordinatorHolder.coordinator();
    }

    private OperatorCoordinatorHolder createCoordinatorHolder(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> biFunction, Function<OperatorCoordinator.Context, OperatorCoordinator> function) throws Exception {
        return createCoordinatorHolder(biFunction, function, ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    private OperatorCoordinatorHolder createCoordinatorHolder(BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> biFunction, final Function<OperatorCoordinator.Context, OperatorCoordinator> function, ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
        final OperatorID operatorID = new OperatorID();
        OperatorCoordinatorHolder create = OperatorCoordinatorHolder.create(operatorID, new OperatorCoordinator.Provider() { // from class: org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolderTest.1
            public OperatorID getOperatorId() {
                return operatorID;
            }

            public OperatorCoordinator create(OperatorCoordinator.Context context) {
                return (OperatorCoordinator) function.apply(context);
            }
        }, biFunction, "test-coordinator-name", 3, 1775);
        create.lazyInitialize(this.globalFailureHandler, componentMainThreadExecutor);
        create.start();
        return create;
    }
}
