/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class UnalignedCheckpointRescaleWithMixedExchangesITCase
extends TestLogger {
    private static final int NUM_TASK_MANAGERS = 1;
    private static final int SLOTS_PER_TASK_MANAGER = 10;
    private static final int MAX_SLOTS = 10;
    private static final Random RANDOM = new Random();
    private static MiniClusterWithClientResource cluster;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Parameterized.Parameter
    public ExecuteJobViaEnv executeJobViaEnv;

    @Parameterized.Parameters(name="Test case {index}")
    public static Collection<ExecuteJobViaEnv> parameter() {
        return Lists.newArrayList((Object[])new ExecuteJobViaEnv[]{UnalignedCheckpointRescaleWithMixedExchangesITCase::createMultiOutputDAG, UnalignedCheckpointRescaleWithMixedExchangesITCase::createMultiInputDAG, UnalignedCheckpointRescaleWithMixedExchangesITCase::createRescalePartitionerDAG, UnalignedCheckpointRescaleWithMixedExchangesITCase::createMixedComplexityDAG});
    }

    @Before
    public void setup() throws Exception {
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(new Configuration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(10).build());
        cluster.before();
    }

    @After
    public void shutDownExistingCluster() {
        if (cluster != null) {
            cluster.after();
            cluster = null;
        }
    }

    @Test
    public void testRescaleFromUnalignedCheckpoint() throws Exception {
        MiniCluster miniCluster = cluster.getMiniCluster();
        JobClient jobClient1 = this.executeJobViaEnv.executeJob(this.getUnalignedCheckpointEnv(null));
        CommonTestUtils.waitForJobStatus((JobClient)jobClient1, Collections.singletonList(JobStatus.RUNNING));
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniCluster, (JobID)jobClient1.getJobID(), (boolean)false);
        String checkpointPath = CommonTestUtils.waitForCheckpointWithInflightBuffers((JobID)jobClient1.getJobID(), (MiniCluster)miniCluster);
        jobClient1.cancel().get();
        JobClient jobClient2 = this.executeJobViaEnv.executeJob(this.getUnalignedCheckpointEnv(checkpointPath));
        CommonTestUtils.waitForJobStatus((JobClient)jobClient2, Collections.singletonList(JobStatus.RUNNING));
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniCluster, (JobID)jobClient2.getJobID(), (boolean)false);
        CommonTestUtils.waitForCheckpointWithInflightBuffers((JobID)jobClient2.getJobID(), (MiniCluster)miniCluster);
        jobClient2.cancel().get();
    }

    private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String recoveryPath) throws IOException {
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)Duration.ofSeconds(1L));
        conf.set(CheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, (Object)Duration.ofSeconds(0L));
        conf.set(CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, (Object)ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)this.temporaryFolder.newFolder().toURI().toString());
        conf.set(CheckpointingOptions.ENABLE_UNALIGNED, (Object)true);
        conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, (Object)MemorySize.parse((String)"1 kb"));
        conf.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, (Object)50);
        if (recoveryPath != null) {
            conf.set(StateRecoveryOptions.SAVEPOINT_PATH, (Object)recoveryPath);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.disableOperatorChaining();
        return env;
    }

    private static JobClient createMultiOutputDAG(StreamExecutionEnvironment env) throws Exception {
        DataGeneratorSource source = new DataGeneratorSource((GeneratorFunction & Serializable)index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond((double)5000.0), Types.LONG);
        int sourceParallelism = UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism();
        DataStreamSource sourceStream = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "Data Generator").setParallelism(sourceParallelism);
        sourceStream.keyBy((KeySelector & Serializable)value -> value).map((MapFunction & Serializable)x -> {
            Thread.sleep(5L);
            return x;
        }).name("Map after keyBy").setParallelism(UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism());
        sourceStream.map((MapFunction & Serializable)x -> {
            Thread.sleep(1L);
            return x;
        }).name("Map after forward").setParallelism(sourceParallelism);
        return env.executeAsync();
    }

    private static JobClient createMultiInputDAG(StreamExecutionEnvironment env) throws Exception {
        int source1Parallelism = UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism();
        DataGeneratorSource source1 = new DataGeneratorSource((GeneratorFunction & Serializable)index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond((double)5000.0), Types.LONG);
        DataStreamSource sourceStream1 = env.fromSource((Source)source1, WatermarkStrategy.noWatermarks(), "Source 1").setParallelism(source1Parallelism);
        int source2Parallelism = UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism();
        DataGeneratorSource source2 = new DataGeneratorSource((GeneratorFunction & Serializable)index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond((double)5000.0), Types.LONG);
        DataStreamSource sourceStream2 = env.fromSource((Source)source2, WatermarkStrategy.noWatermarks(), "Source 2").setParallelism(source2Parallelism);
        SingleOutputStreamOperator forwardedStream = sourceStream2.map((MapFunction & Serializable)x -> x).setParallelism(source2Parallelism);
        sourceStream1.rebalance().connect(forwardedStream.rebalance()).map((CoMapFunction)new SleepingCoMap()).name("Co-Map").setParallelism(UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism());
        return env.executeAsync();
    }

    private static JobClient createRescalePartitionerDAG(StreamExecutionEnvironment env) throws Exception {
        DataGeneratorSource source = new DataGeneratorSource((GeneratorFunction & Serializable)index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond((double)5000.0), Types.LONG);
        int sourceParallelism = UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism();
        DataStreamSource sourceStream = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "Data Generator").setParallelism(sourceParallelism);
        sourceStream.keyBy((KeySelector & Serializable)value -> value).map((MapFunction & Serializable)x -> {
            Thread.sleep(5L);
            return x;
        }).name("Map after keyBy").setParallelism(UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism());
        sourceStream.rescale().map((MapFunction & Serializable)x -> {
            Thread.sleep(1L);
            return x;
        }).name("Map after rescale").setParallelism(UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism());
        return env.executeAsync();
    }

    private static JobClient createMixedComplexityDAG(StreamExecutionEnvironment env) throws Exception {
        int source1Parallelism = UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism();
        DataGeneratorSource source1 = new DataGeneratorSource((GeneratorFunction & Serializable)index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond((double)5000.0), Types.LONG);
        DataStreamSource sourceStream1 = env.fromSource((Source)source1, WatermarkStrategy.noWatermarks(), "Source 1").setParallelism(source1Parallelism);
        int source2Parallelism = UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism();
        DataGeneratorSource source2 = new DataGeneratorSource((GeneratorFunction & Serializable)index -> index, Long.MAX_VALUE, RateLimiterStrategy.perSecond((double)5000.0), Types.LONG);
        DataStreamSource sourceStream2 = env.fromSource((Source)source2, WatermarkStrategy.noWatermarks(), "Source 2").setParallelism(source2Parallelism);
        SingleOutputStreamOperator forwardedStream = sourceStream2.map((MapFunction & Serializable)x -> x).setParallelism(source2Parallelism);
        SingleOutputStreamOperator multiInputMap = sourceStream1.rebalance().connect(forwardedStream.rebalance()).map((CoMapFunction)new SleepingCoMap()).name("Co-Map").setParallelism(UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism());
        multiInputMap.keyBy((KeySelector & Serializable)value -> value).map((MapFunction & Serializable)x -> {
            Thread.sleep(5L);
            return x;
        }).name("Map after keyBy").setParallelism(UnalignedCheckpointRescaleWithMixedExchangesITCase.getRandomParallelism());
        multiInputMap.map((MapFunction & Serializable)x -> {
            Thread.sleep(1L);
            return x;
        }).name("Map after forward").setParallelism(multiInputMap.getParallelism());
        return env.executeAsync();
    }

    private static int getRandomParallelism() {
        return RANDOM.nextInt(10) + 1;
    }

    public static interface ExecuteJobViaEnv {
        public JobClient executeJob(StreamExecutionEnvironment var1) throws Exception;
    }

    private static class SleepingCoMap
    implements CoMapFunction<Long, Long, Long> {
        private SleepingCoMap() {
        }

        public Long map1(Long value) throws Exception {
            Thread.sleep(1L);
            return value;
        }

        public Long map2(Long value) throws Exception {
            Thread.sleep(1L);
            return value;
        }
    }
}

