package org.apache.flink.test.javaApiOperators;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.ReplicatingInputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.util.NumberSequenceIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.class */
public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
    private String resultPath;
    private String expectedResult;

    @Rule
    public TemporaryFolder tempFolder;

    /* loaded from: input_file:org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase$ToTuple.class */
    public static class ToTuple implements MapFunction<Long, Tuple1<Long>> {
        public Tuple1<Long> map(Long l) throws Exception {
            return new Tuple1<>(l);
        }
    }

    public ReplicatingDataSourceITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
        this.tempFolder = new TemporaryFolder();
    }

    @Before
    public void before() throws Exception {
        this.resultPath = this.tempFolder.newFile().toURI().toString();
    }

    @After
    public void after() throws Exception {
        compareResultsByLinesInMemory(this.expectedResult, this.resultPath);
    }

    @Test
    public void testReplicatedSourceToJoin() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.createInput(new ReplicatingInputFormat(new ParallelIteratorInputFormat(new NumberSequenceIterator(0L, 1000L))), BasicTypeInfo.LONG_TYPE_INFO).map(new ToTuple()).join(executionEnvironment.generateSequence(0L, 1000L).map(new ToTuple())).where(new int[]{0}).equalTo(new int[]{0}).projectFirst(new int[]{0}).sum(0).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expectedResult = "(500500)";
    }

    @Test
    public void testReplicatedSourceToCross() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.createInput(new ReplicatingInputFormat(new ParallelIteratorInputFormat(new NumberSequenceIterator(0L, 1000L))), BasicTypeInfo.LONG_TYPE_INFO).map(new ToTuple()).cross(executionEnvironment.generateSequence(0L, 1000L).map(new ToTuple())).filter(new FilterFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>>() { // from class: org.apache.flink.test.javaApiOperators.ReplicatingDataSourceITCase.2
            public boolean filter(Tuple2<Tuple1<Long>, Tuple1<Long>> tuple2) throws Exception {
                return ((Long) ((Tuple1) tuple2.f0).f0).equals(((Tuple1) tuple2.f1).f0);
            }
        }).map(new MapFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>, Tuple1<Long>>() { // from class: org.apache.flink.test.javaApiOperators.ReplicatingDataSourceITCase.1
            public Tuple1<Long> map(Tuple2<Tuple1<Long>, Tuple1<Long>> tuple2) throws Exception {
                return (Tuple1) tuple2.f0;
            }
        }).sum(0).writeAsText(this.resultPath);
        executionEnvironment.execute();
        this.expectedResult = "(500500)";
    }
}
