package org.apache.beam.runners.flink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHints;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValues;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.class */
public class FlinkStreamingTransformTranslatorsTest {

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest$TestBoundedSource.class */
    private static class TestBoundedSource extends BoundedSource<String> {
        private final int bytes;

        private TestBoundedSource(int i) {
            this.bytes = i;
        }

        public List<? extends BoundedSource<String>> split(long j, PipelineOptions pipelineOptions) throws Exception {
            ArrayList arrayList = new ArrayList();
            long j2 = this.bytes;
            while (j2 > 0) {
                j2 -= j;
                arrayList.add(this);
            }
            return arrayList;
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
            return this.bytes;
        }

        public BoundedSource.BoundedReader<String> createReader(PipelineOptions pipelineOptions) throws IOException {
            return null;
        }

        public Coder<String> getOutputCoder() {
            return StringUtf8Coder.of();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest$TestUnboundedSource.class */
    private static class TestUnboundedSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
        private TestUnboundedSource() {
        }

        public List<? extends UnboundedSource<String, UnboundedSource.CheckpointMark>> split(int i, PipelineOptions pipelineOptions) throws Exception {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(this);
            }
            return arrayList;
        }

        public UnboundedSource.UnboundedReader<String> createReader(PipelineOptions pipelineOptions, UnboundedSource.CheckpointMark checkpointMark) throws IOException {
            return null;
        }

        public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
            return null;
        }
    }

    @Test
    public void readSourceTranslatorBoundedWithMaxParallelism() {
        SplittableParDo.PrimitiveBoundedRead primitiveBoundedRead = new SplittableParDo.PrimitiveBoundedRead(Read.from(new TestBoundedSource(6)));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.setMaxParallelism(6);
        Assert.assertEquals(6L, ((SourceTransformation) applyReadSourceTransform(primitiveBoundedRead, PCollection.IsBounded.BOUNDED, executionEnvironment)).getSource().getNumSplits());
    }

    @Test
    public void readSourceTranslatorBoundedWithoutMaxParallelism() {
        SplittableParDo.PrimitiveBoundedRead primitiveBoundedRead = new SplittableParDo.PrimitiveBoundedRead(Read.from(new TestBoundedSource(2)));
        StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
        Assert.assertEquals(2L, ((SourceTransformation) applyReadSourceTransform(primitiveBoundedRead, PCollection.IsBounded.BOUNDED, r0)).getSource().getNumSplits());
    }

    @Test
    public void readSourceTranslatorUnboundedWithMaxParallelism() {
        SplittableParDo.PrimitiveUnboundedRead primitiveUnboundedRead = new SplittableParDo.PrimitiveUnboundedRead(Read.from(new TestUnboundedSource()));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.setMaxParallelism(6);
        Assert.assertEquals(6L, ((SourceTransformation) Iterables.getOnlyElement(((OneInputTransformation) applyReadSourceTransform(primitiveUnboundedRead, PCollection.IsBounded.UNBOUNDED, executionEnvironment)).getInputs())).getSource().getNumSplits());
    }

    @Test
    public void readSourceTranslatorUnboundedWithoutMaxParallelism() {
        SplittableParDo.PrimitiveUnboundedRead primitiveUnboundedRead = new SplittableParDo.PrimitiveUnboundedRead(Read.from(new TestUnboundedSource()));
        StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2);
        Assert.assertEquals(2L, ((SourceTransformation) Iterables.getOnlyElement(((OneInputTransformation) applyReadSourceTransform(primitiveUnboundedRead, PCollection.IsBounded.UNBOUNDED, r0)).getInputs())).getSource().getNumSplits());
    }

    private Object applyReadSourceTransform(PTransform<?, ?> pTransform, PCollection.IsBounded isBounded, StreamExecutionEnvironment streamExecutionEnvironment) {
        FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<?, ?>> readSourceTranslator = getReadSourceTranslator();
        FlinkStreamingTranslationContext flinkStreamingTranslationContext = new FlinkStreamingTranslationContext(streamExecutionEnvironment, PipelineOptionsFactory.create(), true);
        PCollection createPrimitiveOutputInternal = PCollection.createPrimitiveOutputInternal(Pipeline.create(), WindowingStrategy.globalDefault(), isBounded, StringUtf8Coder.of());
        createPrimitiveOutputInternal.setName("output");
        HashMap hashMap = new HashMap();
        hashMap.put(new TupleTag(), createPrimitiveOutputInternal);
        flinkStreamingTranslationContext.setCurrentTransform(AppliedPTransform.of("test-transform", Collections.emptyMap(), PValues.fullyExpand(hashMap), pTransform, ResourceHints.create(), Pipeline.create()));
        readSourceTranslator.translateNode(pTransform, flinkStreamingTranslationContext);
        return flinkStreamingTranslationContext.getInputDataStream(createPrimitiveOutputInternal).getTransformation();
    }

    private FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<?, ?>> getReadSourceTranslator() {
        PTransformTranslation.RawPTransform rawPTransform = (PTransformTranslation.RawPTransform) Mockito.mock(PTransformTranslation.RawPTransform.class);
        Mockito.when(rawPTransform.getUrn()).thenReturn("beam:transform:read:v1");
        return FlinkStreamingTransformTranslators.getTranslator(rawPTransform);
    }
}
