/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.time.Duration;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.scheduling.UpdateJobResourceRequirementsITCase;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={TestLoggerExtension.class})
class RescaleOnCheckpointITCase {
    private static final Logger LOG = LoggerFactory.getLogger(RescaleOnCheckpointITCase.class);
    private static final int NUMBER_OF_SLOTS = 4;
    private static final int BEFORE_RESCALE_PARALLELISM = 4;
    private static final int AFTER_RESCALE_PARALLELISM = 3;
    private static final Duration REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP = Duration.ofSeconds(2L);
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setConfiguration(RescaleOnCheckpointITCase.createConfiguration()).setNumberSlotsPerTaskManager(4).build());

    RescaleOnCheckpointITCase() {
    }

    private static Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(WebOptions.REFRESH_INTERVAL, (Object)Duration.ofMillis(50L));
        configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, (Object)Duration.ofMillis(50L));
        configuration.set(CheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)TestingUtils.infiniteDuration());
        configuration.set(JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_MAX_DELAY, (Object)TestingUtils.infiniteDuration());
        configuration.set(JobManagerOptions.SCHEDULER_EXECUTING_COOLDOWN_AFTER_RESCALING, (Object)Duration.ZERO);
        return configuration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testRescaleOnCheckpoint(@InjectMiniCluster MiniCluster miniCluster, @InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception {
        Configuration config = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        env.setParallelism(4);
        env.fromSequence(0L, Integer.MAX_VALUE).sinkTo((Sink)new DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        Iterator jobVertexIterator = jobGraph.getVertices().iterator();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)jobVertexIterator.hasNext()).as("There needs to be at least one JobVertex.", new Object[0])).isTrue();
        JobVertexID jobVertexId = ((JobVertex)jobVertexIterator.next()).getID();
        JobResourceRequirements jobResourceRequirements = JobResourceRequirements.newBuilder().setParallelismForJobVertex(jobVertexId, 1, 3).build();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)jobVertexIterator.hasNext()).as("This test expects to have only one JobVertex.", new Object[0])).isFalse();
        restClusterClient.submitJob(jobGraph).join();
        JobID jobId = jobGraph.getJobID();
        try {
            LOG.info("Waiting for job {} to reach parallelism of {} for vertex {}.", new Object[]{jobId, 4, jobVertexId});
            UpdateJobResourceRequirementsITCase.waitForRunningTasks(restClusterClient, jobId, 4);
            LOG.info("Job {} reached parallelism of {} for vertex {}. Updating the vertex parallelism next to {}.", new Object[]{jobId, 4, jobVertexId, 3});
            restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join();
            Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis());
            LOG.info("Checking that job {} hasn't changed its parallelism even after some delay, yet.", (Object)jobId);
            UpdateJobResourceRequirementsITCase.waitForRunningTasks(restClusterClient, jobId, 4);
            miniCluster.triggerCheckpoint(jobId);
            LOG.info("Waiting for job {} to reach parallelism of {} for vertex {}.", new Object[]{jobId, 3, jobVertexId});
            UpdateJobResourceRequirementsITCase.waitForRunningTasks(restClusterClient, jobId, 3);
            boolean expectedFreeSlotCount = true;
            LOG.info("Waiting for {} slot(s) to become available due to the scale down.", (Object)1);
            UpdateJobResourceRequirementsITCase.waitForAvailableSlots(restClusterClient, 1);
            LOG.info("{} free slot(s) detected. Finishing test.", (Object)1);
        }
        finally {
            restClusterClient.cancel(jobGraph.getJobID()).join();
        }
    }
}

