/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.MockSubtaskCheckpointCoordinatorBuilder;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.OperatorChainTest;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiFunctionWithException;
import org.junit.Assert;
import org.junit.Test;

public class SubtaskCheckpointCoordinatorTest {
    @Test
    public void testInitCheckpoint() throws IOException {
        Assert.assertTrue((boolean)this.initCheckpoint(true, CheckpointType.CHECKPOINT));
        Assert.assertFalse((boolean)this.initCheckpoint(false, CheckpointType.CHECKPOINT));
        Assert.assertFalse((boolean)this.initCheckpoint(false, CheckpointType.SAVEPOINT));
    }

    private boolean initCheckpoint(boolean unalignedCheckpointEnabled, CheckpointType checkpointType) throws IOException {
        class MockWriter
        extends ChannelStateWriter.NoOpChannelStateWriter {
            private boolean started;

            MockWriter() {
            }

            public void start(long checkpointId, CheckpointOptions checkpointOptions) {
                this.started = true;
            }
        }
        MockWriter writer = new MockWriter();
        SubtaskCheckpointCoordinator coordinator = SubtaskCheckpointCoordinatorTest.coordinator(unalignedCheckpointEnabled, (ChannelStateWriter)writer);
        CheckpointStorageLocationReference locationReference = CheckpointStorageLocationReference.getDefault();
        CheckpointOptions options = new CheckpointOptions(checkpointType, locationReference, true, unalignedCheckpointEnabled);
        coordinator.initCheckpoint(1L, options);
        return writer.started;
    }

    @Test
    public void testNotifyCheckpointComplete() throws Exception {
        TestTaskStateManager stateManager = new TestTaskStateManager();
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager((TaskStateManager)stateManager).build();
        SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).build();
        OperatorChain<?, ?> operatorChain = this.getOperatorChain(mockEnvironment);
        long checkpointId = 42L;
        subtaskCheckpointCoordinator.notifyCheckpointComplete(checkpointId, operatorChain, () -> true);
        Assert.assertEquals((long)checkpointId, (long)stateManager.getNotifiedCompletedCheckpointId());
        long newCheckpointId = checkpointId + 1L;
        subtaskCheckpointCoordinator.notifyCheckpointComplete(newCheckpointId, operatorChain, () -> false);
        Assert.assertEquals((long)newCheckpointId, (long)stateManager.getNotifiedCompletedCheckpointId());
    }

    @Test
    public void testSavepointNotResultingInPriorityEvents() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        SubtaskCheckpointCoordinator coordinator = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setEnvironment((Environment)mockEnvironment).build();
        final AtomicReference<Object> broadcastedPriorityEvent = new AtomicReference<Object>(null);
        OperatorChain operatorChain = new OperatorChain(new MockStreamTaskBuilder((Environment)mockEnvironment).build(), (RecordWriterDelegate)new NonRecordWriter()){

            public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
                super.broadcastEvent(event, isPriorityEvent);
                broadcastedPriorityEvent.set(isPriorityEvent);
            }
        };
        coordinator.checkpointState(new CheckpointMetaData(0L, 0L), new CheckpointOptions(CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault()), new CheckpointMetrics(), operatorChain, () -> false);
        Assert.assertEquals((Object)false, broadcastedPriorityEvent.get());
    }

    @Test
    public void testSkipChannelStateForSavepoints() throws Exception {
        SubtaskCheckpointCoordinator coordinator = new MockSubtaskCheckpointCoordinatorBuilder().setUnalignedCheckpointEnabled(true).setPrepareInputSnapshot((BiFunctionWithException<ChannelStateWriter, Long, CompletableFuture<Void>, IOException>)((BiFunctionWithException)(u1, u2) -> {
            Assert.fail((String)"should not prepare input snapshot for savepoint");
            return null;
        })).build();
        coordinator.checkpointState(new CheckpointMetaData(0L, 0L), new CheckpointOptions(CheckpointType.SAVEPOINT, CheckpointStorageLocationReference.getDefault()), new CheckpointMetrics(), new OperatorChain(new StreamTaskTest.NoOpStreamTask((Environment)new DummyEnvironment()), (RecordWriterDelegate)new NonRecordWriter()), () -> false);
    }

    @Test
    public void testNotifyCheckpointAbortedManyTimes() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        int maxRecordAbortedCheckpoints = 256;
        SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).setMaxRecordAbortedCheckpoints(maxRecordAbortedCheckpoints).build();
        OperatorChain<?, ?> operatorChain = this.getOperatorChain(mockEnvironment);
        long notifyAbortedTimes = maxRecordAbortedCheckpoints + 42;
        int i = 1;
        while ((long)i < notifyAbortedTimes) {
            subtaskCheckpointCoordinator.notifyCheckpointAborted((long)i, operatorChain, () -> true);
            Assert.assertEquals((long)Math.min(maxRecordAbortedCheckpoints, i), (long)subtaskCheckpointCoordinator.getAbortedCheckpointSize());
            ++i;
        }
    }

    @Test
    public void testNotifyCheckpointAbortedBeforeAsyncPhase() throws Exception {
        TestTaskStateManager stateManager = new TestTaskStateManager();
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager((TaskStateManager)stateManager).build();
        SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).setUnalignedCheckpointEnabled(true).build();
        CheckpointOperator checkpointOperator = new CheckpointOperator(new OperatorSnapshotFutures());
        OperatorChain operatorChain = this.operatorChain(checkpointOperator);
        long checkpointId = 42L;
        subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
        Assert.assertEquals((long)1L, (long)subtaskCheckpointCoordinator.getAbortedCheckpointSize());
        subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
        subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetrics(), operatorChain, () -> true);
        Assert.assertFalse((boolean)checkpointOperator.isCheckpointed());
        Assert.assertEquals((long)-1L, (long)stateManager.getReportedCheckpointId());
        Assert.assertEquals((long)0L, (long)subtaskCheckpointCoordinator.getAbortedCheckpointSize());
        Assert.assertEquals((long)0L, (long)subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
    }

    @Test
    public void testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, 1, 1, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator((StreamOperator)new MapOperator());
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).build();
        TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1, 4096);
        ArrayList recordOrEvents = new ArrayList();
        StreamElementSerializer stringStreamElementSerializer = new StreamElementSerializer((TypeSerializer)StringSerializer.INSTANCE);
        RecordOrEventCollectingResultPartitionWriter resultPartitionWriter = new RecordOrEventCollectingResultPartitionWriter(recordOrEvents, (BufferProvider)bufferProvider, (TypeSerializer)stringStreamElementSerializer);
        mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
        OneInputStreamTask task = testHarness.getTask();
        OperatorChain operatorChain = new OperatorChain(task, StreamTask.createRecordWriterDelegate((StreamConfig)streamConfig, (Environment)mockEnvironment));
        long checkpointId = 42L;
        subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
        subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetrics(), operatorChain, () -> true);
        Assert.assertEquals((long)1L, (long)recordOrEvents.size());
        Object recordOrEvent = recordOrEvents.get(0);
        Assert.assertTrue((boolean)(recordOrEvent instanceof CancelCheckpointMarker));
        Assert.assertEquals((long)checkpointId, (long)((CancelCheckpointMarker)recordOrEvent).getCheckpointId());
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
    }

    @Test
    public void testNotifyCheckpointAbortedDuringAsyncPhase() throws Exception {
        MockEnvironment mockEnvironment = MockEnvironment.builder().build();
        SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).setExecutor(Executors.newSingleThreadExecutor()).setUnalignedCheckpointEnabled(true).build();
        BlockingRunnableFuture rawKeyedStateHandleFuture = new BlockingRunnableFuture();
        OperatorSnapshotFutures operatorSnapshotResult = new OperatorSnapshotFutures((RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)rawKeyedStateHandleFuture, (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (RunnableFuture)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()), (Future)DoneFuture.of((Object)SnapshotResult.empty()));
        OperatorChain operatorChain = this.operatorChain(new CheckpointOperator(operatorSnapshotResult));
        long checkpointId = 42L;
        subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointId, CheckpointOptions.forCheckpointWithDefaultLocation());
        subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetrics(), operatorChain, () -> true);
        rawKeyedStateHandleFuture.awaitRun();
        Assert.assertEquals((long)1L, (long)subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
        Assert.assertFalse((boolean)rawKeyedStateHandleFuture.isCancelled());
        subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
        Assert.assertTrue((boolean)rawKeyedStateHandleFuture.isCancelled());
        Assert.assertEquals((long)0L, (long)subtaskCheckpointCoordinator.getAsyncCheckpointRunnableSize());
    }

    @Test
    public void testNotifyCheckpointAbortedAfterAsyncPhase() throws Exception {
        TestTaskStateManager stateManager = new TestTaskStateManager();
        MockEnvironment mockEnvironment = MockEnvironment.builder().setTaskStateManager((TaskStateManager)stateManager).build();
        SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = (SubtaskCheckpointCoordinatorImpl)new MockSubtaskCheckpointCoordinatorBuilder().setEnvironment((Environment)mockEnvironment).build();
        OperatorChain<?, ?> operatorChain = this.getOperatorChain(mockEnvironment);
        long checkpointId = 42L;
        subtaskCheckpointCoordinator.checkpointState(new CheckpointMetaData(checkpointId, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetrics(), operatorChain, () -> true);
        subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, () -> true);
        Assert.assertEquals((long)0L, (long)subtaskCheckpointCoordinator.getAbortedCheckpointSize());
        Assert.assertEquals((long)checkpointId, (long)stateManager.getNotifiedAbortedCheckpointId());
    }

    private OperatorChain<?, ?> getOperatorChain(MockEnvironment mockEnvironment) throws Exception {
        return new OperatorChain((StreamTask)new MockStreamTaskBuilder((Environment)mockEnvironment).build(), (RecordWriterDelegate)new NonRecordWriter());
    }

    private <T> OperatorChain<T, AbstractStreamOperator<T>> operatorChain(OneInputStreamOperator<T, T> ... streamOperators) throws Exception {
        return OperatorChainTest.setupOperatorChain(streamOperators);
    }

    private static SubtaskCheckpointCoordinator coordinator(boolean unalignedCheckpointEnabled, ChannelStateWriter channelStateWriter) throws IOException {
        return new SubtaskCheckpointCoordinatorImpl((CheckpointStorageWorkerView)new TestCheckpointStorageWorkerView(100), "test", StreamTaskActionExecutor.IMMEDIATE, new CloseableRegistry(), (ExecutorService)MoreExecutors.newDirectExecutorService(), (Environment)new DummyEnvironment(), (message, unused) -> Assert.fail((String)message), (unused1, unused2) -> CompletableFuture.completedFuture(null), 0, channelStateWriter);
    }

    private static class CheckpointOperator
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;
        private final OperatorSnapshotFutures operatorSnapshotFutures;
        private boolean checkpointed = false;

        CheckpointOperator(OperatorSnapshotFutures operatorSnapshotFutures) {
            this.operatorSnapshotFutures = operatorSnapshotFutures;
        }

        boolean isCheckpointed() {
            return this.checkpointed;
        }

        public void open() throws Exception {
        }

        public void close() throws Exception {
        }

        public void dispose() {
        }

        public void prepareSnapshotPreBarrier(long checkpointId) {
        }

        public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception {
            this.checkpointed = true;
            return this.operatorSnapshotFutures;
        }

        public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
        }

        public void setKeyContextElement1(StreamRecord<?> record) {
        }

        public void setKeyContextElement2(StreamRecord<?> record) {
        }

        public MetricGroup getMetricGroup() {
            return null;
        }

        public OperatorID getOperatorID() {
            return new OperatorID();
        }

        public void notifyCheckpointComplete(long checkpointId) {
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }

        public void setCurrentKey(Object key) {
        }

        public Object getCurrentKey() {
            return null;
        }

        public void processElement(StreamRecord<String> element) throws Exception {
        }

        public void processWatermark(Watermark mark) throws Exception {
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    private static final class BlockingRunnableFuture
    implements RunnableFuture<SnapshotResult<KeyedStateHandle>> {
        private final CompletableFuture<SnapshotResult<KeyedStateHandle>> future = new CompletableFuture();
        private final OneShotLatch signalRunLatch = new OneShotLatch();
        private final CountDownLatch countDownLatch = new CountDownLatch(2);
        private final SnapshotResult<KeyedStateHandle> value = SnapshotResult.empty();

        private BlockingRunnableFuture() {
        }

        @Override
        public void run() {
            this.signalRunLatch.trigger();
            this.countDownLatch.countDown();
            try {
                this.countDownLatch.await();
            }
            catch (InterruptedException e) {
                ExceptionUtils.rethrow((Throwable)e);
            }
            this.future.complete(this.value);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            this.future.cancel(mayInterruptIfRunning);
            return true;
        }

        @Override
        public boolean isCancelled() {
            return this.future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.future.isDone();
        }

        @Override
        public SnapshotResult<KeyedStateHandle> get() throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        @Override
        public SnapshotResult<KeyedStateHandle> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
            return this.future.get();
        }

        void awaitRun() throws InterruptedException {
            this.signalRunLatch.await();
        }
    }

    private static class MapOperator
    extends StreamMap<String, String> {
        private static final long serialVersionUID = 1L;

        public MapOperator() {
            super((MapFunction & Serializable)value -> value);
        }

        public void notifyCheckpointAborted(long checkpointId) throws Exception {
        }
    }
}

