/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.Serializable;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.testutils.ExceptionallyDoneFuture;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.test.checkpointing.ChangelogRecoveryITCaseBase;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.Preconditions;
import org.junit.Test;

public class ChangelogRecoveryITCase
extends ChangelogRecoveryITCaseBase {
    public ChangelogRecoveryITCase(AbstractStateBackend delegatedStateBackend) {
        super(delegatedStateBackend);
    }

    @Test
    public void testNonMaterialization() throws Exception {
        File checkpointFolder = TEMPORARY_FOLDER.newFolder();
        final SharedReference jobID = this.sharedObjects.add((Object)this.generateJobID());
        final SharedReference miniCluster = this.sharedObjects.add((Object)this.cluster.getMiniCluster());
        final SharedReference hasMaterialization = this.sharedObjects.add((Object)new AtomicBoolean(true));
        StreamExecutionEnvironment env = this.getEnv(checkpointFolder, 1000L, 1, 10L, 0);
        env.getConfig().enablePeriodicMaterialize(false);
        this.waitAndAssert(this.buildJobGraph((StateBackend)this.delegatedStateBackend, env, new ChangelogRecoveryITCaseBase.ControlledSource(){

            @Override
            protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                if (this.getRuntimeContext().getTaskInfo().getAttemptNumber() == 0 && this.currentIndex == 5000) {
                    this.waitWhile(() -> this.completedCheckpointNum.get() <= 0);
                    ((AtomicBoolean)hasMaterialization.get()).compareAndSet(true, !ChangelogRecoveryITCaseBase.getAllStateHandleId((JobID)jobID.get(), (MiniCluster)miniCluster.get()).isEmpty());
                    this.throwArtificialFailure();
                }
            }
        }, (JobID)jobID.get()));
        Preconditions.checkState((!((AtomicBoolean)hasMaterialization.get()).get() ? 1 : 0) != 0);
    }

    @Test
    public void testMaterialization() throws Exception {
        File checkpointFolder = TEMPORARY_FOLDER.newFolder();
        final SharedReference jobID = this.sharedObjects.add((Object)this.generateJobID());
        final SharedReference miniCluster = this.sharedObjects.add((Object)this.cluster.getMiniCluster());
        final SharedReference currentCheckpointNum = this.sharedObjects.add((Object)new AtomicInteger());
        final SharedReference currentMaterializationId = this.sharedObjects.add(ConcurrentHashMap.newKeySet());
        StreamExecutionEnvironment env = this.getEnv(checkpointFolder, 100L, 2, 200L, 0);
        this.waitAndAssert(this.buildJobGraph((StateBackend)this.delegatedStateBackend, env, new ChangelogRecoveryITCaseBase.ControlledSource(){

            @Override
            protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                Preconditions.checkState((this.getRuntimeContext().getTaskInfo().getAttemptNumber() <= 2 ? 1 : 0) != 0);
                if (this.getRuntimeContext().getTaskInfo().getAttemptNumber() == 0 && this.currentIndex == 2500) {
                    this.waitWhile(() -> {
                        if (this.completedCheckpointNum.get() <= 0) {
                            return true;
                        }
                        Set<StateHandleID> allMaterializationId = ChangelogRecoveryITCaseBase.getAllStateHandleId((JobID)jobID.get(), (MiniCluster)miniCluster.get());
                        if (!allMaterializationId.isEmpty()) {
                            ((Set)currentMaterializationId.get()).addAll(allMaterializationId);
                            ((AtomicInteger)currentCheckpointNum.get()).compareAndSet(0, this.completedCheckpointNum.get());
                            return false;
                        }
                        return true;
                    });
                    this.throwArtificialFailure();
                } else if (this.getRuntimeContext().getTaskInfo().getAttemptNumber() == 1 && this.currentIndex == 5000) {
                    this.waitWhile(() -> {
                        if (this.completedCheckpointNum.get() <= ((AtomicInteger)currentCheckpointNum.get()).get()) {
                            return true;
                        }
                        Set<StateHandleID> allMaterializationId = ChangelogRecoveryITCaseBase.getAllStateHandleId((JobID)jobID.get(), (MiniCluster)miniCluster.get());
                        return allMaterializationId.isEmpty() || ((Set)currentMaterializationId.get()).equals(allMaterializationId);
                    });
                    this.throwArtificialFailure();
                }
            }
        }, (JobID)jobID.get()));
    }

    @Test
    public void testFailedMaterialization() throws Exception {
        File checkpointFolder = TEMPORARY_FOLDER.newFolder();
        final SharedReference jobID = this.sharedObjects.add((Object)this.generateJobID());
        final SharedReference miniCluster = this.sharedObjects.add((Object)this.cluster.getMiniCluster());
        final SharedReference hasFailed = this.sharedObjects.add((Object)new AtomicBoolean());
        final SharedReference currentMaterializationId = this.sharedObjects.add(ConcurrentHashMap.newKeySet());
        StreamExecutionEnvironment env = this.getEnv(checkpointFolder, 100L, 0, 10L, 1);
        env.setParallelism(1);
        this.waitAndAssert(this.buildJobGraph((StateBackend)StateBackendTestUtils.wrapStateBackendWithSnapshotFunction((AbstractStateBackend)this.delegatedStateBackend, (StateBackendTestUtils.SerializableFunctionWithException & Serializable)snapshotResultFuture -> {
            if (((AtomicBoolean)hasFailed.get()).compareAndSet(false, true)) {
                return ExceptionallyDoneFuture.of((Throwable)new RuntimeException());
            }
            return snapshotResultFuture;
        }), env, new ChangelogRecoveryITCaseBase.ControlledSource(){

            @Override
            protected void beforeElement(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                if (this.currentIndex == 1250) {
                    this.waitWhile(() -> !((AtomicBoolean)hasFailed.get()).get());
                } else if (this.currentIndex == 2500) {
                    this.waitWhile(() -> {
                        Set<StateHandleID> allMaterializationId = ChangelogRecoveryITCaseBase.getAllStateHandleId((JobID)jobID.get(), (MiniCluster)miniCluster.get());
                        if (!allMaterializationId.isEmpty()) {
                            ((Set)currentMaterializationId.get()).addAll(allMaterializationId);
                            return false;
                        }
                        return true;
                    });
                } else if (this.currentIndex == 5000) {
                    this.waitWhile(() -> {
                        Set<StateHandleID> allMaterializationId = ChangelogRecoveryITCaseBase.getAllStateHandleId((JobID)jobID.get(), (MiniCluster)miniCluster.get());
                        return allMaterializationId.isEmpty() || ((Set)currentMaterializationId.get()).equals(allMaterializationId);
                    });
                }
            }
        }, (JobID)jobID.get()));
    }
}

