/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class FlinkStreamingPipelineTranslatorTest {
    @Test
    public void testAutoBalanceShardKeyResolvesMaxParallelism() {
        int parallelism = 3;
        MatcherAssert.assertThat((Object)new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(parallelism, -1, (Coder)StringUtf8Coder.of()).getMaxParallelism(), (Matcher)Matchers.equalTo((Object)KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)parallelism)));
        MatcherAssert.assertThat((Object)new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(parallelism, 0, (Coder)StringUtf8Coder.of()).getMaxParallelism(), (Matcher)Matchers.equalTo((Object)KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)parallelism)));
    }

    @Test
    public void testAutoBalanceShardKeyCacheIsNotSerialized() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction fn = new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(2, 2, (Coder)StringUtf8Coder.of());
        Assert.assertNull((Object)fn.getCache());
        fn.assignShardKey((Object)"target/destination1", (Object)"one", 10);
        fn.assignShardKey((Object)"target/destination2", (Object)"two", 10);
        MatcherAssert.assertThat((Object)fn.getCache().size(), (Matcher)Matchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction)SerializableUtils.clone((Serializable)fn)).getCache(), (Matcher)Matchers.nullValue());
    }

    @Test
    public void testAutoBalanceShardKeyCacheIsStable() throws @UnknownKeyFor @NonNull @Initialized Exception {
        ShardedKey shardKey;
        int numShards = 50;
        FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction fn = new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(numShards / 2, numShards * 2, (Coder)StringUtf8Coder.of());
        ArrayList inputs = Lists.newArrayList();
        for (int i = 0; i < numShards * 100; ++i) {
            inputs.add(KV.of((Object)"target/destination/1", (Object)UUID.randomUUID().toString()));
            inputs.add(KV.of((Object)"target/destination/2", (Object)UUID.randomUUID().toString()));
            inputs.add(KV.of((Object)"target/destination/3", (Object)UUID.randomUUID().toString()));
        }
        HashMap<KV, ShardedKey> generatedKeys = new HashMap<KV, ShardedKey>();
        for (KV input : inputs) {
            shardKey = fn.assignShardKey((Object)((String)input.getKey()), (Object)((String)input.getValue()), numShards);
            generatedKeys.put(KV.of((Object)((String)input.getKey()), (Object)shardKey.getShardNumber()), shardKey);
        }
        fn = new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(numShards / 2, numShards * 2, (Coder)StringUtf8Coder.of());
        Collections.shuffle(inputs);
        for (KV input : inputs) {
            shardKey = fn.assignShardKey((Object)((String)input.getKey()), (Object)((String)input.getValue()), numShards);
            ShardedKey expectedShardKey = (ShardedKey)generatedKeys.get(KV.of((Object)((String)input.getKey()), (Object)shardKey.getShardNumber()));
            if (expectedShardKey == null) continue;
            MatcherAssert.assertThat((Object)shardKey, (Matcher)Matchers.equalTo((Object)expectedShardKey));
        }
    }

    @Test
    public void testAutoBalanceShardKeyCacheMaxSize() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction fn = new FlinkStreamingPipelineTranslator.FlinkAutoBalancedShardKeyShardingFunction(2, 2, (Coder)StringUtf8Coder.of());
        for (int i = 0; i < 200; ++i) {
            fn.assignShardKey((Object)UUID.randomUUID().toString(), (Object)"one", 2);
        }
        MatcherAssert.assertThat((Object)fn.getCache().size(), (Matcher)Matchers.equalTo((Object)100));
    }

    @Test
    public void testStatefulParDoAfterCombineChaining() {
        JobGraph stablePartitioning = this.getStatefulParDoAfterCombineChainingJobGraph(true);
        JobGraph unstablePartitioning = this.getStatefulParDoAfterCombineChainingJobGraph(false);
        Assert.assertEquals((long)1L, (long)(Iterables.size((Iterable)unstablePartitioning.getVertices()) - Iterables.size((Iterable)stablePartitioning.getVertices())));
    }

    private @UnknownKeyFor @NonNull @Initialized JobGraph getStatefulParDoAfterCombineChainingJobGraph(@UnknownKeyFor @NonNull @Initialized boolean stablePartitioning) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkStreamingPipelineTranslator translator = new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create());
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        pipelineOptions.setRunner(FlinkRunner.class);
        Pipeline pipeline = Pipeline.create((PipelineOptions)pipelineOptions);
        PCollection aggregate = (PCollection)((PCollection)pipeline.apply((PTransform)Create.of((Object)"foo", (Object[])new String[]{"bar"}).withCoder((Coder)StringUtf8Coder.of()))).apply(Count.perElement());
        if (!stablePartitioning) {
            aggregate = (PCollection)aggregate.apply((PTransform)ParDo.of(new StatelessIdentityDoFn()));
        }
        aggregate.apply((PTransform)ParDo.of(new StatefulNoopDoFn()));
        translator.translate(pipeline);
        return env.getStreamGraph().getJobGraph();
    }

    @Test
    public void testStatefulParDoAfterGroupByKeyChaining() {
        JobGraph stablePartitioning = this.getStatefulParDoAfterGroupByKeyChainingJobGraph(true);
        JobGraph unstablePartitioning = this.getStatefulParDoAfterGroupByKeyChainingJobGraph(false);
        Assert.assertEquals((long)1L, (long)(Iterables.size((Iterable)unstablePartitioning.getVertices()) - Iterables.size((Iterable)stablePartitioning.getVertices())));
    }

    private @UnknownKeyFor @NonNull @Initialized JobGraph getStatefulParDoAfterGroupByKeyChainingJobGraph(@UnknownKeyFor @NonNull @Initialized boolean stablePartitioning) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkStreamingPipelineTranslator translator = new FlinkStreamingPipelineTranslator(env, PipelineOptionsFactory.create());
        PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
        pipelineOptions.setRunner(FlinkRunner.class);
        Pipeline pipeline = Pipeline.create((PipelineOptions)pipelineOptions);
        PCollection aggregate = (PCollection)((PCollection)pipeline.apply((PTransform)Create.of((Object)KV.of((Object)"foo", (Object)1L), (Object[])new KV[]{KV.of((Object)"bar", (Object)1L)}).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarLongCoder.of())))).apply((PTransform)GroupByKey.create());
        if (!stablePartitioning) {
            aggregate = (PCollection)aggregate.apply((PTransform)ParDo.of(new StatelessIdentityDoFn()));
        }
        aggregate.apply((PTransform)ParDo.of(new StatefulNoopDoFn()));
        translator.translate(pipeline);
        return env.getStreamGraph().getJobGraph();
    }

    private static class StatefulNoopDoFn<@UnknownKeyFor KeyT, @UnknownKeyFor ValueT>
    extends DoFn<KV<KeyT, ValueT>, Void> {
        @DoFn.TimerId(value="my-timer")
        private final @UnknownKeyFor @NonNull @Initialized TimerSpec myTimer = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

        private StatefulNoopDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement() {
        }

        @DoFn.OnTimer(value="my-timer")
        public void onMyTimer() {
        }
    }

    private static class StatelessIdentityDoFn<@UnknownKeyFor KeyT, @UnknownKeyFor ValueT>
    extends DoFn<KV<KeyT, ValueT>, KV<KeyT, ValueT>> {
        private StatelessIdentityDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(/*
         * Issues handling annotations - annotations may be inaccurate
         */
        // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext ctx) {
            ctx.output((Object)((KV)ctx.element()));
        }
    }
}

