/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.connector.source.lib;

import java.io.Serializable;
import java.util.List;
import java.util.stream.LongStream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class NumberSequenceSourceITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());

    @Test
    public void testParallelSourceExecution() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        DataStreamSource stream = env.fromSource((Source)new NumberSequenceSource(1L, 1000L), WatermarkStrategy.noWatermarks(), "iterator source");
        List result = stream.executeAndCollect(10000);
        Assert.assertThat((Object)result, (Matcher)Matchers.containsInAnyOrder((Object[])LongStream.rangeClosed(1L, 1000L).boxed().toArray()));
    }

    @Test
    public void testCheckpointingWithDelayedAssignment() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        RestartStrategyUtils.configureNoRestartStrategy((StreamExecutionEnvironment)env);
        env.enableCheckpointing(10L, CheckpointingMode.EXACTLY_ONCE);
        SingleOutputStreamOperator stream = env.fromSequence(0L, 100L).map((MapFunction & Serializable)x -> {
            if (x == 0L) {
                Thread.sleep(50L);
            }
            return x;
        });
        List result = stream.executeAndCollect(1000);
        Assert.assertThat((Object)result, (Matcher)Matchers.contains((Object[])LongStream.rangeClosed(0L, 100L).boxed().toArray()));
    }

    @Test
    public void testLessSplitsThanParallelism() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        int n = 2;
        SingleOutputStreamOperator stream = env.fromSequence(0L, (long)n).map((MapFunction & Serializable)l -> l);
        List result = stream.executeAndCollect(100);
        Assert.assertThat((Object)result, (Matcher)Matchers.containsInAnyOrder((Object[])LongStream.rangeClosed(0L, n).boxed().toArray()));
    }
}

