package org.apache.beam.runners.flink.adapter;

import java.lang.invoke.SerializedLambda;
import java.util.Map;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapterTest.class */
public class BeamFlinkDataStreamAdapterTest {
    /* JADX INFO: Access modifiers changed from: private */
    public static PTransform<PCollection<? extends String>, PCollection<String>> withPrefix(final String str) {
        return ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataStreamAdapterTest.1
            @DoFn.ProcessElement
            public void processElement(@DoFn.Element String str2, DoFn.OutputReceiver<String> outputReceiver) {
                outputReceiver.output(str + str2);
            }
        });
    }

    @Test
    public void testApplySimpleTransform() throws Exception {
        MatcherAssert.assertThat(ImmutableList.copyOf(new BeamFlinkDataStreamAdapter().applyBeamPTransform(StreamExecutionEnvironment.createLocalEnvironment().fromCollection(ImmutableList.of("a", "b", "c")), withPrefix("x")).executeAndCollect()), Matchers.containsInAnyOrder(new String[]{"xa", "xb", "xc"}));
    }

    @Test
    public void testApplyCompositeTransform() throws Exception {
        MatcherAssert.assertThat(ImmutableList.copyOf(new BeamFlinkDataStreamAdapter().applyBeamPTransform(StreamExecutionEnvironment.createLocalEnvironment().fromCollection(ImmutableList.of("a", "b", "c")), new PTransform<PCollection<String>, PCollection<String>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataStreamAdapterTest.2
            public PCollection<String> expand(PCollection<String> pCollection) {
                return pCollection.apply(BeamFlinkDataStreamAdapterTest.withPrefix("x")).apply(BeamFlinkDataStreamAdapterTest.withPrefix("y"));
            }
        }).executeAndCollect()), Matchers.containsInAnyOrder(new String[]{"yxa", "yxb", "yxc"}));
    }

    @Test
    public void testApplyMultiInputTransform() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        MatcherAssert.assertThat(ImmutableList.copyOf(new BeamFlinkDataStreamAdapter().applyBeamPTransform(ImmutableMap.of("x", createLocalEnvironment.fromCollection(ImmutableList.of("a", "b", "c")), "y", createLocalEnvironment.fromCollection(ImmutableList.of("d", "e", "f"))), new PTransform<PCollectionTuple, PCollection<String>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataStreamAdapterTest.3
            public PCollection<String> expand(PCollectionTuple pCollectionTuple) {
                return PCollectionList.of(pCollectionTuple.get("x").apply(BeamFlinkDataStreamAdapterTest.withPrefix("x"))).and(pCollectionTuple.get("y").apply(BeamFlinkDataStreamAdapterTest.withPrefix("y"))).apply(Flatten.pCollections());
            }
        }).executeAndCollect()), Matchers.containsInAnyOrder(new String[]{"xa", "xb", "xc", "yd", "ye", "yf"}));
    }

    @Test
    public void testApplyMultiOutputTransform() throws Exception {
        Map applyMultiOutputBeamPTransform = new BeamFlinkDataStreamAdapter().applyMultiOutputBeamPTransform(StreamExecutionEnvironment.createLocalEnvironment().fromCollection(ImmutableList.of("a", "b", "c")), new PTransform<PCollection<String>, PCollectionTuple>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataStreamAdapterTest.4
            public PCollectionTuple expand(PCollection<String> pCollection) {
                return PCollectionTuple.of("x", pCollection.apply(BeamFlinkDataStreamAdapterTest.withPrefix("x"))).and("y", pCollection.apply(BeamFlinkDataStreamAdapterTest.withPrefix("y")));
            }
        });
        MatcherAssert.assertThat(ImmutableList.copyOf(((DataStream) applyMultiOutputBeamPTransform.get("x")).executeAndCollect()), Matchers.containsInAnyOrder(new Object[]{"xa", "xb", "xc"}));
        MatcherAssert.assertThat(ImmutableList.copyOf(((DataStream) applyMultiOutputBeamPTransform.get("y")).executeAndCollect()), Matchers.containsInAnyOrder(new Object[]{"ya", "yb", "yc"}));
    }

    @Test
    public void testApplyGroupingTransform() throws Exception {
        MatcherAssert.assertThat(ImmutableList.copyOf(new BeamFlinkDataStreamAdapter().applyBeamPTransform(StreamExecutionEnvironment.createLocalEnvironment().fromCollection(ImmutableList.of("a", "a", "b")), new PTransform<PCollection<String>, PCollection<KV<String, Long>>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataStreamAdapterTest.5
            public PCollection<KV<String, Long>> expand(PCollection<String> pCollection) {
                return pCollection.apply(Window.into(FixedWindows.of(Duration.millis(10L)))).apply(Count.perElement());
            }
        }).executeAndCollect()), Matchers.containsInAnyOrder(new KV[]{KV.of("a", 2L), KV.of("b", 1L)}));
    }

    @Test
    public void testApplyPreservesInputTimestamps() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        MatcherAssert.assertThat(ImmutableList.copyOf(new BeamFlinkDataStreamAdapter().applyBeamPTransform(createLocalEnvironment.fromCollection(ImmutableList.of(1L, 2L, 12L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(java.time.Duration.ofMillis(100L)).withTimestampAssigner((l, j) -> {
            return l.longValue();
        })), new PTransform<PCollection<Long>, PCollection<Long>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataStreamAdapterTest.6
            public PCollection<Long> expand(PCollection<Long> pCollection) {
                return pCollection.apply(Window.into(FixedWindows.of(Duration.millis(10L)))).apply(Sum.longsGlobally().withoutDefaults());
            }
        }).executeAndCollect()), Matchers.containsInAnyOrder(new Long[]{3L, 12L}));
    }

    @Test
    public void testApplyPreservesOutputTimestamps() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        MatcherAssert.assertThat(ImmutableList.copyOf(new BeamFlinkDataStreamAdapter().applyBeamPTransform(createLocalEnvironment.fromCollection(ImmutableList.of(1L, 2L, 12L)), new PTransform<PCollection<Long>, PCollection<Long>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataStreamAdapterTest.7
            public PCollection<Long> expand(PCollection<Long> pCollection) {
                return pCollection.apply(WithTimestamps.of(l -> {
                    return Instant.ofEpochMilli(l.longValue());
                }));
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 368561265:
                        if (implMethodName.equals("lambda$expand$13f85998$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapterTest$7") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Lorg/joda/time/Instant;")) {
                            return l -> {
                                return Instant.ofEpochMilli(l.longValue());
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        }).windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10L))).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        }).executeAndCollect()), Matchers.containsInAnyOrder(new Long[]{3L, 12L}));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2120459839:
                if (implMethodName.equals("lambda$testApplyPreservesInputTimestamps$bd6bbd98$1")) {
                    z = true;
                    break;
                }
                break;
            case -831489668:
                if (implMethodName.equals("lambda$testApplyPreservesOutputTimestamps$8248e4bf$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return (l, l2) -> {
                        return Long.valueOf(l.longValue() + l2.longValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;J)J")) {
                    return (l3, j) -> {
                        return l3.longValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
