/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator;
import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators;
import org.apache.beam.runners.flink.FlinkStreamingTranslationContext;
import org.apache.beam.runners.flink.SourceTransformationCompat;
import org.apache.beam.runners.flink.streaming.StreamSources;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.PValues;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class FlinkStreamingTransformTranslatorsTest {
    @Test
    public void readSourceTranslatorBoundedWithMaxParallelism() {
        int maxParallelism = 6;
        int parallelism = 2;
        SplittableParDo.PrimitiveBoundedRead transform = new SplittableParDo.PrimitiveBoundedRead(Read.from((BoundedSource)new TestBoundedSource(6)));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setMaxParallelism(6);
        Object sourceTransform = this.applyReadSourceTransform((PTransform<?, ?>)transform, PCollection.IsBounded.BOUNDED, env);
        FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId source = (FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId)SourceTransformationCompat.getOperator(sourceTransform).getUserFunction();
        Assert.assertEquals((long)6L, (long)source.getUnderlyingSource().getSplitSources().size());
    }

    @Test
    public void readSourceTranslatorBoundedWithoutMaxParallelism() {
        int parallelism = 2;
        SplittableParDo.PrimitiveBoundedRead transform = new SplittableParDo.PrimitiveBoundedRead(Read.from((BoundedSource)new TestBoundedSource(2)));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        Object sourceTransform = this.applyReadSourceTransform((PTransform<?, ?>)transform, PCollection.IsBounded.BOUNDED, env);
        FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId source = (FlinkStreamingTransformTranslators.UnboundedSourceWrapperNoValueWithRecordId)SourceTransformationCompat.getOperator(sourceTransform).getUserFunction();
        Assert.assertEquals((long)2L, (long)source.getUnderlyingSource().getSplitSources().size());
    }

    @Test
    public void readSourceTranslatorUnboundedWithMaxParallelism() {
        int maxParallelism = 6;
        int parallelism = 2;
        SplittableParDo.PrimitiveUnboundedRead transform = new SplittableParDo.PrimitiveUnboundedRead(Read.from((UnboundedSource)new TestUnboundedSource()));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setMaxParallelism(6);
        OneInputTransformation sourceTransform = (OneInputTransformation)this.applyReadSourceTransform((PTransform<?, ?>)transform, PCollection.IsBounded.UNBOUNDED, env);
        UnboundedSourceWrapper source = (UnboundedSourceWrapper)SourceTransformationCompat.getOperator(StreamSources.getOnlyInput(sourceTransform)).getUserFunction();
        Assert.assertEquals((long)6L, (long)source.getSplitSources().size());
    }

    @Test
    public void readSourceTranslatorUnboundedWithoutMaxParallelism() {
        int parallelism = 2;
        SplittableParDo.PrimitiveUnboundedRead transform = new SplittableParDo.PrimitiveUnboundedRead(Read.from((UnboundedSource)new TestUnboundedSource()));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        OneInputTransformation sourceTransform = (OneInputTransformation)this.applyReadSourceTransform((PTransform<?, ?>)transform, PCollection.IsBounded.UNBOUNDED, env);
        UnboundedSourceWrapper source = (UnboundedSourceWrapper)SourceTransformationCompat.getOperator(StreamSources.getOnlyInput(sourceTransform)).getUserFunction();
        Assert.assertEquals((long)2L, (long)source.getSplitSources().size());
    }

    private @UnknownKeyFor @NonNull @Initialized Object applyReadSourceTransform(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> transform, // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized PCollection.IsBounded isBounded, @UnknownKeyFor @NonNull @Initialized StreamExecutionEnvironment env) {
        FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<?, ?>> translator = this.getReadSourceTranslator();
        FlinkStreamingTranslationContext ctx = new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create());
        Pipeline pipeline = Pipeline.create();
        PCollection pc = PCollection.createPrimitiveOutputInternal((Pipeline)pipeline, (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)isBounded, (Coder)StringUtf8Coder.of());
        pc.setName("output");
        HashMap<TupleTag, PCollection> outputs = new HashMap<TupleTag, PCollection>();
        outputs.put(new TupleTag(), pc);
        AppliedPTransform appliedTransform = AppliedPTransform.of((String)"test-transform", Collections.emptyMap(), (Map)PValues.fullyExpand(outputs), transform, (Pipeline)Pipeline.create());
        ctx.setCurrentTransform(appliedTransform);
        translator.translateNode(transform, ctx);
        return ctx.getInputDataStream((PValue)pc).getTransformation();
    }

    private /*
     * Issues handling annotations - annotations may be inaccurate
     */
    // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized FlinkStreamingPipelineTranslator.StreamTransformTranslator<@UnknownKeyFor @NonNull @Initialized PTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?>> getReadSourceTranslator() {
        PTransformTranslation.RawPTransform t = (PTransformTranslation.RawPTransform)Mockito.mock(PTransformTranslation.RawPTransform.class);
        Mockito.when((Object)t.getUrn()).thenReturn((Object)"beam:transform:read:v1");
        return FlinkStreamingTransformTranslators.getTranslator((PTransform)t);
    }

    private static class TestUnboundedSource
    extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
        private TestUnboundedSource() {
        }

        public @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized UnboundedSource<@UnknownKeyFor @NonNull @Initialized String, // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark>> split(@UnknownKeyFor @NonNull @Initialized int desiredNumSplits, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized Exception {
            ArrayList<TestUnboundedSource> splits = new ArrayList<TestUnboundedSource>();
            for (int i = 0; i < desiredNumSplits; ++i) {
                splits.add(this);
            }
            return splits;
        }

        public // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.UnboundedReader<@UnknownKeyFor @NonNull @Initialized String> createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, // Could not load outer class - annotation placement on inner may be incorrect
        @Nullable @UnknownKeyFor @Initialized UnboundedSource.CheckpointMark checkpointMark) throws @UnknownKeyFor @NonNull @Initialized IOException {
            return null;
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<// Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
            return null;
        }
    }

    private static class TestBoundedSource
    extends BoundedSource<String> {
        private final @UnknownKeyFor @NonNull @Initialized int bytes;

        private TestBoundedSource(@UnknownKeyFor @NonNull @Initialized int bytes) {
            this.bytes = bytes;
        }

        public @UnknownKeyFor @NonNull @Initialized List<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized BoundedSource<@UnknownKeyFor @NonNull @Initialized String>> split(@UnknownKeyFor @NonNull @Initialized long desiredBundleSizeBytes, @UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized Exception {
            ArrayList<TestBoundedSource> splits = new ArrayList<TestBoundedSource>();
            for (long remaining = (long)this.bytes; remaining > 0L; remaining -= desiredBundleSizeBytes) {
                splits.add(this);
            }
            return splits;
        }

        public @UnknownKeyFor @NonNull @Initialized long getEstimatedSizeBytes(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized Exception {
            return this.bytes;
        }

        public // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<@UnknownKeyFor @NonNull @Initialized String> createReader(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) throws @UnknownKeyFor @NonNull @Initialized IOException {
            return null;
        }

        public @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized String> getOutputCoder() {
            return StringUtf8Coder.of();
        }
    }
}

