package org.apache.flink.connector.base.source.reader;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Comparator;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.class */
public class CoordinatedSourceRescaleITCase extends TestLogger {
    public static final String CREATED_CHECKPOINT = "successfully created checkpoint";
    public static final String RESTORED_CHECKPOINT = "successfully restored checkpoint";

    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(7).build());

    @Rule
    public final TemporaryFolder temp = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase$FailingMapFunction.class */
    public static class FailingMapFunction extends RichMapFunction<Long, Long> implements CheckpointListener {
        private static final long serialVersionUID = 699621912578369378L;
        private final boolean generateCheckpoint;
        private boolean processedRecord;

        FailingMapFunction(boolean z) {
            this.generateCheckpoint = z;
        }

        public Long map(Long l) throws Exception {
            this.processedRecord = true;
            if (this.generateCheckpoint || l.longValue() % 100 != 42) {
                return l;
            }
            throw new Exception(CoordinatedSourceRescaleITCase.RESTORED_CHECKPOINT);
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (this.generateCheckpoint && this.processedRecord && j > 5) {
                throw new Exception(CoordinatedSourceRescaleITCase.CREATED_CHECKPOINT);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase$SleepySink.class */
    public static class SleepySink implements SinkFunction<Long> {
        private static final long serialVersionUID = -3542950841846119765L;

        private SleepySink() {
        }

        public void invoke(Long l, SinkFunction.Context context) throws Exception {
            if (l.longValue() % 1000 == 0) {
                Thread.sleep(1L);
            }
        }
    }

    @Test
    public void testDownscaling() throws Exception {
        File newFolder = this.temp.newFolder();
        resumeCheckpoint(newFolder, generateCheckpoint(newFolder, 7), 3);
    }

    @Test
    public void testUpscaling() throws Exception {
        File newFolder = this.temp.newFolder();
        resumeCheckpoint(newFolder, generateCheckpoint(newFolder, 3), 7);
    }

    private File generateCheckpoint(File file, int i) throws IOException {
        StreamExecutionEnvironment createEnv = createEnv(file, null, i);
        Assertions.assertThatThrownBy(() -> {
            createEnv.execute("create checkpoint");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(CREATED_CHECKPOINT)});
        return (File) Files.find(file.toPath(), 2, this::isCompletedCheckpoint, new FileVisitOption[0]).max(Comparator.comparing((v0) -> {
            return v0.toString();
        })).map((v0) -> {
            return v0.toFile();
        }).orElseThrow(() -> {
            return new IllegalStateException("Cannot generate checkpoint");
        });
    }

    private boolean isCompletedCheckpoint(Path path, BasicFileAttributes basicFileAttributes) {
        return basicFileAttributes.isDirectory() && path.getFileName().toString().startsWith("chk-") && Files.exists(path.resolve("_metadata"), new LinkOption[0]);
    }

    private void resumeCheckpoint(File file, File file2, int i) {
        StreamExecutionEnvironment createEnv = createEnv(file, file2, i);
        Assertions.assertThatThrownBy(() -> {
            createEnv.execute("resume checkpoint");
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RESTORED_CHECKPOINT)});
    }

    private StreamExecutionEnvironment createEnv(File file, @Nullable File file2, int i) {
        Configuration configuration = new Configuration();
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, file.toURI().toString());
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));
        if (file2 != null) {
            configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, file2.toURI().toString());
        }
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, i);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.fromSequence(0L, Long.MAX_VALUE).map(new FailingMapFunction(file2 == null)).addSink(new SleepySink());
        return executionEnvironment;
    }
}
