/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle;

import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.lifecycle.TestJobExecutor;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent;
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;
import org.apache.flink.runtime.operators.lifecycle.validation.DrainingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.FinishingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.SameCheckpointValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestJobDataFlowValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.CheckpointStorageUtils;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StopWithSavepointITCase
extends AbstractTestBaseJUnit4 {
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    @Rule
    public Timeout timeoutRule = new Timeout(10L, TimeUnit.MINUTES);
    @Parameterized.Parameter(value=0)
    public boolean withDrain;
    @Parameterized.Parameter(value=1)
    public TestJobBuilders.TestingGraphBuilder graphBuilder;

    @Test
    public void test() throws Exception {
        TestJobWithDescription testJob = this.graphBuilder.build(this.sharedObjects, (ThrowingConsumer<Configuration, Exception>)((ThrowingConsumer)cfg -> {}), (ThrowingConsumer<StreamExecutionEnvironment, Exception>)((ThrowingConsumer)env -> CheckpointStorageUtils.configureFileSystemCheckpointStorage((StreamExecutionEnvironment)env, (URI)TEMPORARY_FOLDER.newFolder().toURI())));
        TestJobExecutor.execute(testJob, MINI_CLUSTER_RESOURCE).waitForEvent(WatermarkReceivedEvent.class).stopWithSavepoint(this.temporaryFolder, this.withDrain);
        SameCheckpointValidator sameCheckpointValidator = new SameCheckpointValidator(StopWithSavepointITCase.getHighestCheckpoint(testJob.eventQueue.getAll()));
        if (this.withDrain) {
            TestOperatorLifecycleValidator.checkOperatorsLifecycle(testJob, sameCheckpointValidator, new DrainingValidator(), new FinishingValidator());
        } else {
            TestOperatorLifecycleValidator.checkOperatorsLifecycle(testJob, sameCheckpointValidator);
        }
        TestJobDataFlowValidator.checkDataFlow(testJob, this.withDrain);
    }

    @Parameterized.Parameters(name="withDrain: {0}, {1}")
    public static Object[] parameters() {
        return new Object[][]{{true, TestJobBuilders.SIMPLE_GRAPH_BUILDER}, {false, TestJobBuilders.SIMPLE_GRAPH_BUILDER}, {true, TestJobBuilders.COMPLEX_GRAPH_BUILDER}, {false, TestJobBuilders.COMPLEX_GRAPH_BUILDER}};
    }

    private static long getHighestCheckpoint(List<TestEvent> events) {
        return events.stream().filter(e -> e instanceof CheckpointCompletedEvent).mapToLong(e -> ((CheckpointCompletedEvent)e).checkpointID).max().getAsLong();
    }
}

