package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.class */
public class CoordinatorEventsExactlyOnceITCase extends TestLogger {
    private static final ConfigOption<String> ACC_NAME = ConfigOptions.key("acc").stringType().noDefaultValue();
    private static final String OPERATOR_1_NAME = "operator-1";
    private static final String OPERATOR_2_NAME = "operator-2";
    private static MiniCluster miniCluster;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase$EndEvent.class */
    public static final class EndEvent implements OperatorEvent {
        private EndEvent() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase$EventCollectingTask.class */
    public static final class EventCollectingTask extends AbstractInvokable {
        private final OperatorID operatorID;
        private final String accumulatorName;
        private final LinkedBlockingQueue<Object> actions;
        private volatile boolean running;

        public EventCollectingTask(Environment environment) {
            super(environment);
            this.running = true;
            this.operatorID = OperatorID.fromJobVertexID(environment.getJobVertexId());
            this.accumulatorName = (String) environment.getTaskConfiguration().get(CoordinatorEventsExactlyOnceITCase.ACC_NAME);
            this.actions = new LinkedBlockingQueue<>();
        }

        public void invoke() throws Exception {
            ArrayList arrayList = new ArrayList();
            restoreState(arrayList);
            getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(this.operatorID, new SerializedValue(new StartEvent()));
            while (this.running) {
                Object take = this.actions.take();
                if (take instanceof EndEvent) {
                    break;
                }
                if (take instanceof IntegerEvent) {
                    arrayList.add(Integer.valueOf(((IntegerEvent) take).value));
                } else {
                    if (!(take instanceof CheckpointMetaData)) {
                        throw new Exception("Unrecognized: " + take);
                    }
                    takeCheckpoint(((CheckpointMetaData) take).getCheckpointId(), arrayList);
                }
            }
            if (this.running) {
                ListAccumulator listAccumulator = new ListAccumulator();
                listAccumulator.getClass();
                arrayList.forEach((v1) -> {
                    r1.add(v1);
                });
                getEnvironment().getAccumulatorRegistry().getUserMap().put(this.accumulatorName, listAccumulator);
            }
        }

        public void cancel() throws Exception {
            this.running = false;
        }

        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            this.actions.add(checkpointMetaData);
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long j) {
            return CompletableFuture.completedFuture(null);
        }

        public Future<Void> notifyCheckpointAbortAsync(long j) {
            return CompletableFuture.completedFuture(null);
        }

        public void dispatchOperatorEvent(OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) throws FlinkException {
            try {
                this.actions.add((OperatorEvent) serializedValue.deserializeValue(getUserCodeClassLoader()));
            } catch (IOException | ClassNotFoundException e) {
                throw new FlinkException(e);
            }
        }

        private void takeCheckpoint(long j, List<Integer> list) throws Exception {
            getEnvironment().acknowledgeCheckpoint(j, new CheckpointMetrics(), CoordinatorEventsExactlyOnceITCase.createSnapshot(CoordinatorEventsExactlyOnceITCase.stateToHandle(list), this.operatorID));
        }

        private void restoreState(List<Integer> list) throws Exception {
            StreamStateHandle readSnapshot = CoordinatorEventsExactlyOnceITCase.readSnapshot(getEnvironment().getTaskStateManager(), this.operatorID);
            if (readSnapshot != null) {
                list.addAll(CoordinatorEventsExactlyOnceITCase.handleToState(readSnapshot));
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase$EventSendingCoordinator.class */
    private static final class EventSendingCoordinator implements OperatorCoordinator {
        private final OperatorCoordinator.Context context;
        private final ExecutorService mailboxExecutor;
        private final ScheduledExecutorService scheduledExecutor;
        private final int delay;
        private final int maxNumber;
        private int nextNumber;
        private CompletableFuture<byte[]> requestedCheckpoint;
        private CompletableFuture<byte[]> nextToComplete;
        private final int failAtMessage;
        private boolean failedBefore;
        private final ArrayDeque<CountDownLatch> recoveredTaskRunning;
        private OperatorCoordinator.SubtaskGateway subtaskGateway;
        private boolean workLoopRunning;

        private EventSendingCoordinator(OperatorCoordinator.Context context, String str, int i, int i2) {
            this.recoveredTaskRunning = new ArrayDeque<>();
            Preconditions.checkArgument(i2 > 0);
            Preconditions.checkArgument(i >= 3);
            this.context = context;
            this.maxNumber = i;
            this.delay = i2;
            this.mailboxExecutor = Executors.newSingleThreadExecutor(new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Coordinator Mailbox for " + str));
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Coordinator Periodic Actions for " + str));
            this.failAtMessage = (i / 3) + new Random().nextInt(i / 3);
        }

        public void start() throws Exception {
        }

        public void close() throws Exception {
            this.scheduledExecutor.shutdownNow();
            Assert.assertTrue(this.scheduledExecutor.awaitTermination(10L, TimeUnit.MINUTES));
            this.mailboxExecutor.shutdownNow();
            Assert.assertTrue(this.mailboxExecutor.awaitTermination(10L, TimeUnit.MINUTES));
        }

        public void handleEventFromOperator(int i, OperatorEvent operatorEvent) throws Exception {
            if (i != 0 || !(operatorEvent instanceof StartEvent)) {
                throw new Exception(String.format("Don't recognize event '%s' from task %d.", operatorEvent, Integer.valueOf(i)));
            }
            synchronized (this.recoveredTaskRunning) {
                Iterator<CountDownLatch> it = this.recoveredTaskRunning.iterator();
                while (it.hasNext()) {
                    it.next().countDown();
                }
                this.recoveredTaskRunning.clear();
            }
            runInMailbox(() -> {
                Preconditions.checkState(!this.workLoopRunning);
                Preconditions.checkState(this.subtaskGateway != null);
                this.workLoopRunning = true;
                scheduleSingleAction();
            });
        }

        public void subtaskFailed(int i, @Nullable Throwable th) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            synchronized (this.recoveredTaskRunning) {
                this.recoveredTaskRunning.addLast(countDownLatch);
            }
            runInMailbox(() -> {
                try {
                    countDownLatch.await();
                } catch (Exception e) {
                }
                executeSingleAction();
            });
            runInMailbox(() -> {
                this.workLoopRunning = false;
                this.subtaskGateway = null;
            });
        }

        public void subtaskReset(int i, long j) {
        }

        public void subtaskReady(int i, OperatorCoordinator.SubtaskGateway subtaskGateway) {
            runInMailbox(() -> {
                Preconditions.checkState(!this.workLoopRunning);
                this.subtaskGateway = subtaskGateway;
            });
        }

        public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
            runInMailbox(() -> {
                this.nextNumber = bArr == null ? 0 : CoordinatorEventsExactlyOnceITCase.bytesToInt(bArr);
            });
        }

        public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
            runInMailbox(() -> {
                this.requestedCheckpoint = completableFuture;
            });
        }

        public void notifyCheckpointComplete(long j) {
        }

        void runInMailbox(Runnable runnable) {
            this.mailboxExecutor.execute(() -> {
                try {
                    runnable.run();
                } catch (Throwable th) {
                    th.printStackTrace();
                    ExceptionUtils.rethrow(th);
                }
            });
        }

        void scheduleSingleAction() {
            try {
                this.scheduledExecutor.schedule(() -> {
                    runInMailbox(this::executeSingleAction);
                }, this.delay, TimeUnit.MILLISECONDS);
            } catch (RejectedExecutionException e) {
                if (!this.scheduledExecutor.isShutdown()) {
                    throw e;
                }
            }
        }

        private void executeSingleAction() {
            if (this.workLoopRunning) {
                try {
                    handleCheckpoint();
                    sendNextEvent();
                    checkWhetherToTriggerFailure();
                } catch (Throwable th) {
                    th.printStackTrace();
                    System.exit(-1);
                }
                scheduleSingleAction();
            }
        }

        private void handleCheckpoint() {
            if (this.nextToComplete != null) {
                this.nextToComplete.complete(CoordinatorEventsExactlyOnceITCase.intToBytes(Math.min(this.nextNumber, this.maxNumber)));
                this.nextToComplete = null;
            }
            if (this.requestedCheckpoint != null) {
                this.nextToComplete = this.requestedCheckpoint;
                this.requestedCheckpoint = null;
            }
        }

        private void sendNextEvent() {
            if (this.nextNumber > this.maxNumber) {
                return;
            }
            if (this.nextNumber == this.maxNumber) {
                this.subtaskGateway.sendEvent(new EndEvent());
            } else {
                this.subtaskGateway.sendEvent(new IntegerEvent(this.nextNumber));
            }
            this.nextNumber++;
        }

        private void checkWhetherToTriggerFailure() {
            if (this.nextNumber < this.failAtMessage || this.failedBefore) {
                return;
            }
            this.failedBefore = true;
            this.context.failJob(new Exception("test failure"));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase$IntegerEvent.class */
    public static final class IntegerEvent implements OperatorEvent {
        final int value;

        IntegerEvent(int i) {
            this.value = i;
        }

        public String toString() {
            return "IntegerEvent " + this.value;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase$StartEvent.class */
    private static final class StartEvent implements OperatorEvent {
        private StartEvent() {
        }
    }

    @BeforeClass
    public static void startMiniCluster() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(RestOptions.BIND_PORT, "0");
        miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setNumTaskManagers(2).setNumSlotsPerTaskManager(1).setConfiguration(configuration).build());
        miniCluster.start();
    }

    @AfterClass
    public static void shutdownMiniCluster() throws Exception {
        miniCluster.close();
    }

    @Test
    public void test() throws Exception {
        JobExecutionResult executeJobBlocking = miniCluster.executeJobBlocking(JobGraphBuilder.newStreamingJobGraphBuilder().setJobName("Coordinator Events Job").addJobVertices(Arrays.asList(buildJobVertex(OPERATOR_1_NAME, 200, 1), buildJobVertex(OPERATOR_2_NAME, 5, 200))).setJobCheckpointingSettings(createCheckpointSettings()).build());
        checkListContainsSequence((List) executeJobBlocking.getAccumulatorResult(OPERATOR_1_NAME), 200);
        checkListContainsSequence((List) executeJobBlocking.getAccumulatorResult(OPERATOR_2_NAME), 5);
    }

    private static void checkListContainsSequence(List<Integer> list, int i) {
        if (list.size() != i) {
            failList(list, i);
        }
        int i2 = 0;
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            if (it.next().intValue() != i3) {
                failList(list, i);
            }
        }
    }

    private static void failList(List<Integer> list, int i) {
        Assert.fail(String.format("List did not contain expected sequence of %d elements, but was: (%d elements): %s", Integer.valueOf(i), Integer.valueOf(list.size()), list));
    }

    private static JobVertex buildJobVertex(final String str, final int i, final int i2) throws IOException {
        JobVertex jobVertex = new JobVertex(str);
        final OperatorID fromJobVertexID = OperatorID.fromJobVertexID(jobVertex.getID());
        jobVertex.setParallelism(1);
        jobVertex.setInvokableClass(EventCollectingTask.class);
        jobVertex.getConfiguration().setString(ACC_NAME, str);
        jobVertex.addOperatorCoordinator(new SerializedValue(new OperatorCoordinator.Provider() { // from class: org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase.1
            public OperatorID getOperatorId() {
                return fromJobVertexID;
            }

            public OperatorCoordinator create(OperatorCoordinator.Context context) {
                return new EventSendingCoordinator(context, str, i, i2);
            }
        }));
        return jobVertex;
    }

    private static JobCheckpointingSettings createCheckpointSettings() {
        return new JobCheckpointingSettings(new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).setCheckpointInterval(10L).setCheckpointTimeout(100000L).build(), (SerializedValue) null);
    }

    static byte[] intToBytes(int i) {
        byte[] bArr = new byte[4];
        ByteBuffer.wrap(bArr).order(ByteOrder.LITTLE_ENDIAN).putInt(0, i);
        return bArr;
    }

    static int bytesToInt(byte[] bArr) {
        Assert.assertEquals(4L, bArr.length);
        return ByteBuffer.wrap(bArr).order(ByteOrder.LITTLE_ENDIAN).getInt(0);
    }

    static ByteStreamStateHandle stateToHandle(List<Integer> list) throws IOException {
        return new ByteStreamStateHandle("state", InstantiationUtil.serializeObject(list));
    }

    static List<Integer> handleToState(StreamStateHandle streamStateHandle) throws IOException, ClassNotFoundException {
        return (List) InstantiationUtil.deserializeObject(((ByteStreamStateHandle) streamStateHandle).getData(), EventCollectingTask.class.getClassLoader());
    }

    static TaskStateSnapshot createSnapshot(StreamStateHandle streamStateHandle, OperatorID operatorID) {
        return new TaskStateSnapshot(Collections.singletonMap(operatorID, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(Collections.singletonMap("état_et_moi_:_ça_fait_deux", new OperatorStateHandle.StateMetaInfo(new long[]{0}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)), streamStateHandle)).build()));
    }

    @Nullable
    static StreamStateHandle readSnapshot(TaskStateManager taskStateManager, OperatorID operatorID) {
        if (taskStateManager.prioritizedOperatorState(operatorID).isRestored()) {
            return ((OperatorStateHandle) Iterators.getOnlyElement(((StateObjectCollection) taskStateManager.prioritizedOperatorState(operatorID).getPrioritizedManagedOperatorState().get(0)).iterator())).getDelegateStateHandle();
        }
        return null;
    }
}
