package org.apache.beam.runners.spark.translation;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.spark.translation.SparkCombineFn;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/SparkCombineFnTest.class */
public class SparkCombineFnTest {
    final SerializablePipelineOptions opts = new SerializablePipelineOptions(PipelineOptionsFactory.create());
    CombineWithContext.CombineFnWithContext<Integer, Long, Long> combineFn;

    @Before
    public void setUp() {
        this.combineFn = CombineFnUtil.toFnWithContext(getSumFn());
    }

    @Test
    public void testGlobalWindowCombineFn() throws Exception {
        SparkCombineFn keyed = SparkCombineFn.keyed(this.combineFn, this.opts, Collections.emptyMap(), WindowingStrategy.globalDefault());
        WindowedValue input = input("key", 1, Instant.now());
        WindowedValue input2 = input("key", 2, Instant.now());
        WindowedValue input3 = input("key", 3, Instant.now());
        SparkCombineFn.WindowedAccumulator createCombiner = keyed.createCombiner(input);
        SparkCombineFn.WindowedAccumulator createCombiner2 = keyed.createCombiner(input3);
        keyed.mergeValue(createCombiner, input2);
        Assert.assertEquals(6L, ((Long) ((WindowedValue) Iterables.getOnlyElement(keyed.extractOutput(keyed.mergeCombiners(createCombiner, createCombiner2)))).getValue()).longValue());
    }

    @Test
    public void testGlobalCombineFn() throws Exception {
        SparkCombineFn globally = SparkCombineFn.globally(this.combineFn, this.opts, Collections.emptyMap(), WindowingStrategy.globalDefault());
        WindowedValue inputValue = inputValue(1, Instant.now());
        WindowedValue inputValue2 = inputValue(2, Instant.now());
        WindowedValue inputValue3 = inputValue(3, Instant.now());
        SparkCombineFn.WindowedAccumulator createCombiner = globally.createCombiner(inputValue);
        SparkCombineFn.WindowedAccumulator createCombiner2 = globally.createCombiner(inputValue3);
        globally.mergeValue(createCombiner, inputValue2);
        Assert.assertEquals(6L, ((Long) ((WindowedValue) Iterables.getOnlyElement(globally.extractOutput(globally.mergeCombiners(createCombiner, createCombiner2)))).getValue()).longValue());
    }

    @Test
    public void testSessionCombineFn() throws Exception {
        WindowingStrategy of = WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(1000L)));
        SparkCombineFn keyed = SparkCombineFn.keyed(this.combineFn, this.opts, Collections.emptyMap(), of);
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        WindowedValue input = input("key", 1, ofEpochMilli.plus(Duration.millis(5000L)), of.getWindowFn());
        WindowedValue input2 = input("key", 2, ofEpochMilli.plus(Duration.millis(1000L)), of.getWindowFn());
        WindowedValue input3 = input("key", 3, ofEpochMilli.plus(Duration.millis(500L)), of.getWindowFn());
        SparkCombineFn.WindowedAccumulator createCombiner = keyed.createCombiner(input);
        SparkCombineFn.WindowedAccumulator createCombiner2 = keyed.createCombiner(input3);
        keyed.mergeValue(createCombiner, input2);
        Iterable extractOutput = keyed.extractOutput(keyed.mergeCombiners(createCombiner, createCombiner2));
        Assert.assertEquals(2L, Iterables.size(extractOutput));
        Assert.assertEquals(Lists.newArrayList(new String[]{"5:1999", "1:5999"}), (List) StreamSupport.stream(extractOutput.spliterator(), false).map(windowedValue -> {
            return windowedValue.getValue() + ":" + windowedValue.getTimestamp().getMillis();
        }).collect(Collectors.toList()));
    }

    @Test
    public void testSlidingCombineFnNonMerging() throws Exception {
        WindowingStrategy of = WindowingStrategy.of(SlidingWindows.of(Duration.millis(3000L)).every(Duration.millis(1000L)));
        SparkCombineFn keyed = SparkCombineFn.keyed(this.combineFn, this.opts, Collections.emptyMap(), of, SparkCombineFn.WindowedAccumulator.Type.NON_MERGING);
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        WindowedValue input = input("key", 1, ofEpochMilli.plus(Duration.millis(5000L)), of.getWindowFn());
        WindowedValue input2 = input("key", 2, ofEpochMilli.plus(Duration.millis(1500L)), of.getWindowFn());
        WindowedValue input3 = input("key", 3, ofEpochMilli.plus(Duration.millis(500L)), of.getWindowFn());
        SparkCombineFn.WindowedAccumulator createCombiner = keyed.createCombiner(input);
        SparkCombineFn.WindowedAccumulator createCombiner2 = keyed.createCombiner(input3);
        keyed.mergeValue(createCombiner, input2);
        Iterable extractOutput = keyed.extractOutput(keyed.mergeCombiners(createCombiner, createCombiner2));
        Assert.assertEquals(7L, Iterables.size(extractOutput));
        assertUnorderedEquals(Lists.newArrayList(new String[]{"3:999", "5:1999", "5:2999", "2:3999", "1:5999", "1:6999", "1:7999"}), (List) StreamSupport.stream(extractOutput.spliterator(), false).map(windowedValue -> {
            return windowedValue.getValue() + ":" + windowedValue.getTimestamp().getMillis();
        }).collect(Collectors.toList()));
    }

    @Test
    public void testSlidingCombineFnExplode() throws Exception {
        WindowingStrategy of = WindowingStrategy.of(SlidingWindows.of(Duration.millis(3000L)).every(Duration.millis(1000L)));
        SparkCombineFn keyed = SparkCombineFn.keyed(this.combineFn, this.opts, Collections.emptyMap(), of, SparkCombineFn.WindowedAccumulator.Type.EXPLODE_WINDOWS);
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        Map map = (Map) Stream.of((Object[]) new WindowedValue[]{input("key", 1, ofEpochMilli.plus(Duration.millis(5000L)), of.getWindowFn()), input("key", 2, ofEpochMilli.plus(Duration.millis(1500L)), of.getWindowFn()), input("key", 3, ofEpochMilli.plus(Duration.millis(500L)), of.getWindowFn())}).flatMap(windowedValue -> {
            return StreamSupport.stream(windowedValue.explodeWindows().spliterator(), false);
        }).collect(Collectors.groupingBy(windowedValue2 -> {
            return KV.of((String) ((KV) windowedValue2.getValue()).getKey(), (BoundedWindow) Iterables.getOnlyElement(windowedValue2.getWindows()));
        }));
        ArrayList arrayList = new ArrayList();
        Iterator it = map.entrySet().iterator();
        while (it.hasNext()) {
            SparkCombineFn.WindowedAccumulator windowedAccumulator = null;
            for (WindowedValue windowedValue3 : (List) ((Map.Entry) it.next()).getValue()) {
                if (windowedAccumulator == null) {
                    windowedAccumulator = keyed.createCombiner(windowedValue3);
                } else {
                    windowedAccumulator.add(windowedValue3, keyed);
                }
            }
            WindowedValue windowedValue4 = (WindowedValue) Iterables.getOnlyElement(windowedAccumulator.extractOutput());
            arrayList.add(windowedValue4.getValue() + ":" + windowedValue4.getTimestamp().getMillis());
        }
        assertUnorderedEquals(Lists.newArrayList(new String[]{"3:999", "5:1999", "5:2999", "2:3999", "1:5999", "1:6999", "1:7999"}), arrayList);
    }

    @Test
    public void testGlobalWindowMergeAccumulatorsWithEarliestCombiner() throws Exception {
        SparkCombineFn keyed = SparkCombineFn.keyed(this.combineFn, this.opts, Collections.emptyMap(), WindowingStrategy.globalDefault().withTimestampCombiner(TimestampCombiner.EARLIEST));
        Instant instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
        WindowedValue input = input("key", 1, instant);
        WindowedValue input2 = input("key", 2, instant);
        WindowedValue input3 = input("key", 3, instant);
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(0L);
        SparkCombineFn.SingleWindowWindowedAccumulator create = SparkCombineFn.SingleWindowWindowedAccumulator.create((v0) -> {
            return v0.getValue();
        }, valueInGlobalWindow);
        SparkCombineFn.SingleWindowWindowedAccumulator create2 = SparkCombineFn.SingleWindowWindowedAccumulator.create((v0) -> {
            return v0.getValue();
        }, valueInGlobalWindow);
        SparkCombineFn.SingleWindowWindowedAccumulator create3 = SparkCombineFn.SingleWindowWindowedAccumulator.create((v0) -> {
            return v0.getValue();
        }, valueInGlobalWindow);
        create.add(input, keyed);
        create2.add(input2, keyed);
        create3.merge(create, keyed);
        create3.merge(create2, keyed);
        create3.add(input3, keyed);
        Assert.assertEquals(6L, ((Long) ((WindowedValue) Iterables.getOnlyElement(keyed.extractOutput(create3))).getValue()).longValue());
    }

    private static Combine.CombineFn<Integer, Long, Long> getSumFn() {
        return new Combine.CombineFn<Integer, Long, Long>() { // from class: org.apache.beam.runners.spark.translation.SparkCombineFnTest.1
            /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
            public Long m31createAccumulator() {
                return 0L;
            }

            public Long addInput(Long l, Integer num) {
                return Long.valueOf(l.longValue() + num.intValue());
            }

            public Long mergeAccumulators(Iterable<Long> iterable) {
                return Long.valueOf(StreamSupport.stream(iterable.spliterator(), false).mapToLong(l -> {
                    return l.longValue();
                }).sum());
            }

            public Long extractOutput(Long l) {
                return l;
            }

            /* renamed from: mergeAccumulators, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m30mergeAccumulators(Iterable iterable) {
                return mergeAccumulators((Iterable<Long>) iterable);
            }
        };
    }

    private <K, V> WindowedValue<KV<K, V>> input(K k, V v, Instant instant) throws Exception {
        return input(k, v, instant, WindowingStrategy.globalDefault().getWindowFn());
    }

    private <K, V> WindowedValue<KV<K, V>> input(K k, V v, Instant instant, WindowFn<?, ?> windowFn) throws Exception {
        return inputValue(KV.of(k, v), instant, windowFn);
    }

    private <V> WindowedValue<V> inputValue(V v, Instant instant) throws Exception {
        return inputValue(v, instant, WindowingStrategy.globalDefault().getWindowFn());
    }

    private <V> WindowedValue<V> inputValue(V v, Instant instant, WindowFn<?, ?> windowFn) throws Exception {
        return WindowedValue.of(v, instant, windowFn.assignWindows(assignContext(windowFn, v, instant)), PaneInfo.NO_FIRING);
    }

    <V> WindowFn<V, BoundedWindow>.AssignContext assignContext(WindowFn<V, BoundedWindow> windowFn, V v, Instant instant) {
        Objects.requireNonNull(windowFn);
        return new WindowFn<V, BoundedWindow>.AssignContext(windowFn, v, instant) { // from class: org.apache.beam.runners.spark.translation.SparkCombineFnTest.2
            final /* synthetic */ Object val$value;
            final /* synthetic */ Instant val$timestamp;

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(windowFn);
                this.val$value = v;
                this.val$timestamp = instant;
                Objects.requireNonNull(windowFn);
            }

            /* JADX WARN: Type inference failed for: r0v1, types: [V, java.lang.Object] */
            public V element() {
                return this.val$value;
            }

            public Instant timestamp() {
                return this.val$timestamp;
            }

            public BoundedWindow window() {
                return GlobalWindow.INSTANCE;
            }
        };
    }

    private <T> void assertUnorderedEquals(List<T> list, List<T> list2) {
        Assert.assertEquals(list.stream().collect(Collectors.toSet()), list2.stream().collect(Collectors.toSet()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
