package org.apache.beam.runners.flink;

import java.io.File;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/flink/ReadSourceStreamingTest.class */
public class ReadSourceStreamingTest extends AbstractTestBase {
    protected String resultDir;
    protected String resultPath;
    private static final String[] EXPECTED_RESULT = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};

    @Before
    public void preSubmit() throws Exception {
        File createAndRegisterTempFile = createAndRegisterTempFile("result");
        this.resultDir = createAndRegisterTempFile.toURI().toString();
        this.resultPath = new File(createAndRegisterTempFile, "file.txt").getAbsolutePath();
    }

    @After
    public void postSubmit() throws Exception {
        TestBaseUtils.compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), this.resultDir);
    }

    @Test
    public void testStreaming() {
        runProgram(this.resultPath, true);
    }

    @Test
    public void testBatch() {
        runProgram(this.resultPath, false);
    }

    private static void runProgram(String str, boolean z) {
        FlinkTestPipeline createForStreaming = z ? FlinkTestPipeline.createForStreaming() : FlinkTestPipeline.createForBatch();
        createForStreaming.apply(GenerateSequence.from(0L).to(10L)).apply(ParDo.of(new DoFn<Long, String>() { // from class: org.apache.beam.runners.flink.ReadSourceStreamingTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<Long, String>.ProcessContext processContext) throws Exception {
                processContext.output(((Long) processContext.element()).toString());
            }
        })).apply(TextIO.write().to(str));
        createForStreaming.run();
    }
}
