package org.apache.flink.test.state;

import java.io.File;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/state/ChangelogRescalingITCase.class */
public class ChangelogRescalingITCase extends TestLogger {
    private static final int EVENTS_PER_SECOND_PER_READER = 100;
    private static final int PAYLOAD_SIZE = 1000;
    private static final Time WINDOW_SIZE = Time.milliseconds(100);
    private static final Time WINDOW_SLIDE = Time.milliseconds(10);
    private static final int ACCUMULATE_TIME_MILLIS = 5000;

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private final int parallelism1;
    private final int parallelism2;
    private MiniClusterWithClientResource cluster;

    /* loaded from: input_file:org/apache/flink/test/state/ChangelogRescalingITCase$SourceRateLimiter.class */
    private static final class SourceRateLimiter {
        private final AtomicBoolean newTokensAdded;
        private final int tokensToAdd;
        private int tokensAvailable;

        public SourceRateLimiter(int i) {
            this(i < 10 ? ChangelogRescalingITCase.PAYLOAD_SIZE : 100, i < 10 ? i : i / 10);
        }

        public SourceRateLimiter(int i, int i2) {
            this.newTokensAdded = new AtomicBoolean(false);
            Preconditions.checkArgument(i > 0);
            Preconditions.checkArgument(i2 > 0);
            this.tokensToAdd = i2;
            this.tokensAvailable = i2;
            new Timer("source-limiter", true).scheduleAtFixedRate(new TimerTask() { // from class: org.apache.flink.test.state.ChangelogRescalingITCase.SourceRateLimiter.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    SourceRateLimiter.this.newTokensAdded.set(true);
                }
            }, i, i);
        }

        public boolean request() {
            if (this.tokensAvailable == 0 && this.newTokensAdded.compareAndSet(true, false)) {
                this.tokensAvailable = this.tokensToAdd;
            }
            if (this.tokensAvailable <= 0) {
                return false;
            }
            this.tokensAvailable--;
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/ChangelogRescalingITCase$TestEvent.class */
    public static final class TestEvent implements Serializable {
        private final long id;
        private final byte[] payload;

        private TestEvent(long j, byte[] bArr) {
            this.id = j;
            this.payload = bArr;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/state/ChangelogRescalingITCase$ThrottlingIteratorSourceReader.class */
    private static class ThrottlingIteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IterT>> extends IteratorSourceReader<E, IterT, SplitT> {
        private final SourceRateLimiter rateLimiter;

        public ThrottlingIteratorSourceReader(SourceReaderContext sourceReaderContext, SourceRateLimiter sourceRateLimiter) {
            super(sourceReaderContext);
            this.rateLimiter = sourceRateLimiter;
        }

        public InputStatus pollNext(ReaderOutput<E> readerOutput) {
            return this.rateLimiter.request() ? super.pollNext(readerOutput) : InputStatus.NOTHING_AVAILABLE;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/state/ChangelogRescalingITCase$ThrottlingNumberSequenceSource.class */
    public static class ThrottlingNumberSequenceSource extends NumberSequenceSource {
        private final int numbersPerSecond;

        public ThrottlingNumberSequenceSource(long j, long j2, int i) {
            super(j, j2);
            this.numbersPerSecond = i;
        }

        public SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext sourceReaderContext) {
            return new ThrottlingIteratorSourceReader(sourceReaderContext, new SourceRateLimiter(this.numbersPerSecond));
        }
    }

    @Parameterized.Parameters(name = "Rescale {0} -> {1}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{6, 4}, new Object[]{4, 6}};
    }

    public ChangelogRescalingITCase(int i, int i2) {
        this.parallelism1 = i;
        this.parallelism2 = i2;
    }

    @Before
    public void before() throws Exception {
        Configuration configuration = new Configuration();
        FsStateChangelogStorageFactory.configure(configuration, this.temporaryFolder.newFolder(), Duration.ofMinutes(1L), 10);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberSlotsPerTaskManager(Math.max(this.parallelism1, this.parallelism2)).build());
        this.cluster.before();
    }

    @After
    public void after() {
        if (this.cluster != null) {
            this.cluster.after();
            this.cluster = null;
        }
    }

    @Test
    public void test() throws Exception {
        JobID submit = submit(configureJob(this.parallelism1, this.temporaryFolder.newFolder()), jobGraph -> {
        });
        Thread.sleep(5000L);
        String checkpointAndCancel = checkpointAndCancel(submit);
        JobID submit2 = submit(configureJob(this.parallelism2, this.temporaryFolder.newFolder()), jobGraph2 -> {
            jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(checkpointAndCancel));
        });
        CommonTestUtils.waitForAllTaskRunning(this.cluster.getMiniCluster(), submit2, true);
        this.cluster.getClusterClient().cancel(submit2).get();
    }

    private JobID submit(Configuration configuration, Consumer<JobGraph> consumer) throws InterruptedException, ExecutionException {
        JobGraph createJobGraph = createJobGraph(configuration);
        consumer.accept(createJobGraph);
        return (JobID) this.cluster.getClusterClient().submitJob(createJobGraph).get();
    }

    private JobGraph createJobGraph(Configuration configuration) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        DataStreamUtils.reinterpretAsKeyedStream(executionEnvironment.fromSource(new ThrottlingNumberSequenceSource(0L, Long.MAX_VALUE, 100), WatermarkStrategy.noWatermarks(), "Sequence Source").keyBy(ChangelogRescalingITCase::key).map(l -> {
            byte[] bArr = new byte[PAYLOAD_SIZE];
            ThreadLocalRandom.current().nextBytes(bArr);
            return new TestEvent(l.longValue(), bArr);
        }), testEvent -> {
            return Long.valueOf(key(Long.valueOf(testEvent.id)));
        }).window(SlidingProcessingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE)).process(new ProcessWindowFunction<TestEvent, String, Long, TimeWindow>() { // from class: org.apache.flink.test.state.ChangelogRescalingITCase.1
            public void process(Long l2, ProcessWindowFunction<TestEvent, String, Long, TimeWindow>.Context context, Iterable<TestEvent> iterable, Collector<String> collector) {
            }

            public /* bridge */ /* synthetic */ void process(Object obj, ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception {
                process((Long) obj, (ProcessWindowFunction<TestEvent, String, Long, TimeWindow>.Context) context, (Iterable<TestEvent>) iterable, (Collector<String>) collector);
            }
        }).addSink(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private static long key(Long l) {
        return l.longValue() % 1000;
    }

    private Configuration configureJob(int i, File file) {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, Integer.valueOf(i));
        configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(10L));
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, file.toURI().toString());
        configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, false);
        configuration.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.ofMebiBytes(10L));
        configuration.set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, Duration.ofMinutes(3L));
        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ofMebiBytes(1L));
        configuration.set(PipelineOptions.OBJECT_REUSE, true);
        configuration.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true);
        configuration.set(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ZERO);
        configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false);
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
        return configuration;
    }

    private String checkpointAndCancel(JobID jobID) throws Exception {
        CommonTestUtils.waitForCheckpoint(jobID, this.cluster.getMiniCluster(), 1);
        this.cluster.getClusterClient().cancel(jobID).get();
        waitForSuccessfulTermination(jobID);
        return (String) CommonTestUtils.getLatestCompletedCheckpointPath(jobID, this.cluster.getMiniCluster()).orElseThrow(() -> {
            throw new NoSuchElementException("No checkpoint was created yet");
        });
    }

    private void waitForSuccessfulTermination(JobID jobID) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            return Boolean.valueOf(((JobStatus) this.cluster.getClusterClient().getJobStatus(jobID).get()).isGloballyTerminalState());
        });
        if (((JobStatus) this.cluster.getClusterClient().getJobStatus(jobID).get()).isGloballyTerminalState()) {
            ((JobResult) this.cluster.getClusterClient().requestJobResult(jobID).get()).getSerializedThrowable().ifPresent(serializedThrowable -> {
                throw new RuntimeException((Throwable) serializedThrowable);
            });
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 106079:
                if (implMethodName.equals("key")) {
                    z = 2;
                    break;
                }
                break;
            case 720996016:
                if (implMethodName.equals("lambda$createJobGraph$6d0ba73f$1")) {
                    z = false;
                    break;
                }
                break;
            case 1012430706:
                if (implMethodName.equals("lambda$createJobGraph$b29523fd$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/state/ChangelogRescalingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Lorg/apache/flink/test/state/ChangelogRescalingITCase$TestEvent;")) {
                    return l -> {
                        byte[] bArr = new byte[PAYLOAD_SIZE];
                        ThreadLocalRandom.current().nextBytes(bArr);
                        return new TestEvent(l.longValue(), bArr);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/state/ChangelogRescalingITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/test/state/ChangelogRescalingITCase$TestEvent;)Ljava/lang/Long;")) {
                    return testEvent -> {
                        return Long.valueOf(key(Long.valueOf(testEvent.id)));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/state/ChangelogRescalingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)J")) {
                    return ChangelogRescalingITCase::key;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
