package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.util.MutablePair;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;

@RunWith(Enclosed.class)
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorsTest.class */
public class AggregatorsTest {
    private static final Instant NOW = Instant.parse("2000-01-01T00:00Z");

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorsTest$AbstractSessionsTest.class */
    public static abstract class AbstractSessionsTest<AccT extends Map<IntervalWindow, MutablePair<Instant, Integer>>> {
        static final Duration SESSIONS_GAP = Duration.standardMinutes(15);
        final Aggregator<WindowedValue<Integer>, AccT, Collection<WindowedValue<Integer>>> agg;

        AbstractSessionsTest(WindowFn<?, ?> windowFn) {
            this.agg = AggregatorsTest.windowedAgg(windowFn);
        }

        abstract AccT accOf(KV<IntervalWindow, MutablePair<Instant, Integer>>... kvArr);

        @Test
        public void testReduce() {
            Map map = (Map) this.agg.reduce((Map) this.agg.zero(), sessionValue(10, AggregatorsTest.at(0)));
            MatcherAssert.assertThat(map, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 10))));
            Map map2 = (Map) this.agg.reduce(map, sessionValue(7, AggregatorsTest.at(20)));
            MatcherAssert.assertThat(map2, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 10)), KV.of(sessionWindow(20), AggregatorsTest.pair(AggregatorsTest.at(20), 7))));
            Map map3 = (Map) this.agg.reduce(map2, sessionValue(6, AggregatorsTest.at(18)));
            MatcherAssert.assertThat(map3, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 10)), KV.of(sessionWindow(18, 35), AggregatorsTest.pair(AggregatorsTest.at(20), 13))));
            Map map4 = (Map) this.agg.reduce(map3, sessionValue(5, AggregatorsTest.at(21)));
            MatcherAssert.assertThat(map4, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 10)), KV.of(sessionWindow(18, 36), AggregatorsTest.pair(AggregatorsTest.at(21), 18))));
            Map map5 = (Map) this.agg.reduce(map4, sessionValue(2, AggregatorsTest.NOW.plus(Duration.standardMinutes(45L))));
            MatcherAssert.assertThat(map5, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 10)), KV.of(sessionWindow(18, 36), AggregatorsTest.pair(AggregatorsTest.at(21), 18)), KV.of(sessionWindow(45), AggregatorsTest.pair(AggregatorsTest.at(45), 2))));
            MatcherAssert.assertThat((Map) this.agg.reduce(map5, sessionValue(1, AggregatorsTest.at(10))), AggregatorsTest.equalsToMap(KV.of(sessionWindow(0, 36), AggregatorsTest.pair(AggregatorsTest.at(21), 29)), KV.of(sessionWindow(45), AggregatorsTest.pair(AggregatorsTest.at(45), 2))));
        }

        @Test
        public void testMerge() {
            MatcherAssert.assertThat((Map) this.agg.merge((Map) this.agg.zero(), (Map) this.agg.zero()), Matchers.equalTo((Map) this.agg.zero()));
            AccT accOf = accOf(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 1)));
            MatcherAssert.assertThat((Map) this.agg.merge(accOf, (Map) this.agg.zero()), Matchers.equalTo(accOf));
            MatcherAssert.assertThat((Map) this.agg.merge((Map) this.agg.zero(), accOf), Matchers.equalTo(accOf));
            Map map = (Map) this.agg.merge(accOf, accOf);
            MatcherAssert.assertThat(map, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 2))));
            Map map2 = (Map) this.agg.merge(map, accOf(KV.of(sessionWindow(20), AggregatorsTest.pair(AggregatorsTest.at(20), 2))));
            MatcherAssert.assertThat(map2, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 2)), KV.of(sessionWindow(20), AggregatorsTest.pair(AggregatorsTest.at(20), 2))));
            Map map3 = (Map) this.agg.merge(accOf(KV.of(sessionWindow(40), AggregatorsTest.pair(AggregatorsTest.at(40), 3))), map2);
            MatcherAssert.assertThat(map3, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0), AggregatorsTest.pair(AggregatorsTest.at(0), 2)), KV.of(sessionWindow(20), AggregatorsTest.pair(AggregatorsTest.at(20), 2)), KV.of(sessionWindow(40), AggregatorsTest.pair(AggregatorsTest.at(40), 3))));
            Map map4 = (Map) this.agg.merge(map3, accOf(KV.of(sessionWindow(10), AggregatorsTest.pair(AggregatorsTest.at(10), 4))));
            MatcherAssert.assertThat(map4, AggregatorsTest.equalsToMap(KV.of(sessionWindow(0, 35), AggregatorsTest.pair(AggregatorsTest.at(20), 8)), KV.of(sessionWindow(40), AggregatorsTest.pair(AggregatorsTest.at(40), 3))));
            MatcherAssert.assertThat((Map) this.agg.merge(accOf(KV.of(sessionWindow(5, 45), AggregatorsTest.pair(AggregatorsTest.at(30), 5))), map4), AggregatorsTest.equalsToMap(KV.of(sessionWindow(0, 55), AggregatorsTest.pair(AggregatorsTest.at(40), 16))));
        }

        private WindowedValue<Integer> sessionValue(Integer num, Instant instant) {
            return WindowedValue.of(num, instant, new IntervalWindow(instant, SESSIONS_GAP), PaneInfo.NO_FIRING);
        }

        private IntervalWindow sessionWindow(int i) {
            return new IntervalWindow(AggregatorsTest.at(i), SESSIONS_GAP);
        }

        private static IntervalWindow sessionWindow(int i, int i2) {
            return AggregatorsTest.intervalWindow(i, i2);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorsTest$MergingWindowedAggregatorTest.class */
    public static class MergingWindowedAggregatorTest extends AbstractSessionsTest<Map<IntervalWindow, MutablePair<Instant, Integer>>> {

        /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorsTest$MergingWindowedAggregatorTest$CustomSessions.class */
        private static class CustomSessions<T> extends WindowFn<T, IntervalWindow> {
            private final Sessions sessions;

            private CustomSessions() {
                this.sessions = Sessions.withGapDuration(AbstractSessionsTest.SESSIONS_GAP);
            }

            public Collection<IntervalWindow> assignWindows(WindowFn<T, IntervalWindow>.AssignContext assignContext) {
                return this.sessions.assignWindows(assignContext);
            }

            public void mergeWindows(WindowFn<T, IntervalWindow>.MergeContext mergeContext) throws Exception {
                this.sessions.mergeWindows(mergeContext);
            }

            public boolean isCompatible(WindowFn<?, ?> windowFn) {
                return this.sessions.isCompatible(windowFn);
            }

            public Coder<IntervalWindow> windowCoder() {
                return this.sessions.windowCoder();
            }

            public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
                return this.sessions.getDefaultWindowMappingFn();
            }
        }

        public MergingWindowedAggregatorTest() {
            super(new CustomSessions());
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.AggregatorsTest.AbstractSessionsTest
        Map<IntervalWindow, MutablePair<Instant, Integer>> accOf(KV<IntervalWindow, MutablePair<Instant, Integer>>... kvArr) {
            return AggregatorsTest.mapOf(kvArr);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorsTest$NonMergingWindowedAggregatorTest.class */
    public static class NonMergingWindowedAggregatorTest {
        private SlidingWindows sliding = SlidingWindows.of(Duration.standardMinutes(15)).every(Duration.standardMinutes(5));
        private Aggregator<WindowedValue<Integer>, Map<IntervalWindow, MutablePair<Instant, Integer>>, Collection<WindowedValue<Integer>>> agg = AggregatorsTest.windowedAgg(this.sliding);

        @Test
        public void testReduce() {
            Map map = (Map) this.agg.reduce((Map) this.agg.zero(), windowedValue(1, AggregatorsTest.at(10)));
            MatcherAssert.assertThat(map, AggregatorsTest.equalsToMap(KV.of(AggregatorsTest.intervalWindow(0, 15), AggregatorsTest.pair(AggregatorsTest.at(10), 1)), KV.of(AggregatorsTest.intervalWindow(5, 20), AggregatorsTest.pair(AggregatorsTest.at(10), 1)), KV.of(AggregatorsTest.intervalWindow(10, 25), AggregatorsTest.pair(AggregatorsTest.at(10), 1))));
            MatcherAssert.assertThat((Map) this.agg.reduce(map, windowedValue(2, AggregatorsTest.at(16))), AggregatorsTest.equalsToMap(KV.of(AggregatorsTest.intervalWindow(0, 15), AggregatorsTest.pair(AggregatorsTest.at(10), 1)), KV.of(AggregatorsTest.intervalWindow(5, 20), AggregatorsTest.pair(AggregatorsTest.at(16), 3)), KV.of(AggregatorsTest.intervalWindow(10, 25), AggregatorsTest.pair(AggregatorsTest.at(16), 3)), KV.of(AggregatorsTest.intervalWindow(15, 30), AggregatorsTest.pair(AggregatorsTest.at(16), 2))));
        }

        @Test
        public void testMerge() {
            MatcherAssert.assertThat((Map) this.agg.merge((Map) this.agg.zero(), (Map) this.agg.zero()), Matchers.equalTo((Map) this.agg.zero()));
            Map mapOf = AggregatorsTest.mapOf(KV.of(AggregatorsTest.intervalWindow(0, 15), AggregatorsTest.pair(AggregatorsTest.at(0), 1)));
            MatcherAssert.assertThat((Map) this.agg.merge(mapOf, (Map) this.agg.zero()), Matchers.equalTo(mapOf));
            MatcherAssert.assertThat((Map) this.agg.merge((Map) this.agg.zero(), mapOf), Matchers.equalTo(mapOf));
            Map map = (Map) this.agg.merge(mapOf, mapOf);
            MatcherAssert.assertThat(map, AggregatorsTest.equalsToMap(KV.of(AggregatorsTest.intervalWindow(0, 15), AggregatorsTest.pair(AggregatorsTest.at(0), 2))));
            Map map2 = (Map) this.agg.merge(map, AggregatorsTest.mapOf(KV.of(AggregatorsTest.intervalWindow(5, 20), AggregatorsTest.pair(AggregatorsTest.at(5), 3))));
            MatcherAssert.assertThat(map2, AggregatorsTest.equalsToMap(KV.of(AggregatorsTest.intervalWindow(0, 15), AggregatorsTest.pair(AggregatorsTest.at(0), 2)), KV.of(AggregatorsTest.intervalWindow(5, 20), AggregatorsTest.pair(AggregatorsTest.at(5), 3))));
            MatcherAssert.assertThat((Map) this.agg.merge(AggregatorsTest.mapOf(KV.of(AggregatorsTest.intervalWindow(10, 25), AggregatorsTest.pair(AggregatorsTest.at(10), 4))), map2), AggregatorsTest.equalsToMap(KV.of(AggregatorsTest.intervalWindow(0, 15), AggregatorsTest.pair(AggregatorsTest.at(0), 2)), KV.of(AggregatorsTest.intervalWindow(5, 20), AggregatorsTest.pair(AggregatorsTest.at(5), 3)), KV.of(AggregatorsTest.intervalWindow(10, 25), AggregatorsTest.pair(AggregatorsTest.at(10), 4))));
        }

        private WindowedValue<Integer> windowedValue(Integer num, Instant instant) {
            return WindowedValue.of(num, instant, this.sliding.assignWindows(instant), PaneInfo.NO_FIRING);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorsTest$SessionsAggregatorTest.class */
    public static class SessionsAggregatorTest extends AbstractSessionsTest<TreeMap<IntervalWindow, MutablePair<Instant, Integer>>> {
        public SessionsAggregatorTest() {
            super(Sessions.withGapDuration(SESSIONS_GAP));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.AggregatorsTest.AbstractSessionsTest
        TreeMap<IntervalWindow, MutablePair<Instant, Integer>> accOf(KV<IntervalWindow, MutablePair<Instant, Integer>>... kvArr) {
            return new TreeMap<>(AggregatorsTest.mapOf(kvArr));
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.AggregatorsTest.AbstractSessionsTest
        /* bridge */ /* synthetic */ TreeMap<IntervalWindow, MutablePair<Instant, Integer>> accOf(KV[] kvArr) {
            return accOf((KV<IntervalWindow, MutablePair<Instant, Integer>>[]) kvArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorsTest$SimpleSum.class */
    public static class SimpleSum extends Combine.CombineFn<Integer, Integer, Integer> {
        private SimpleSum() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public Integer m16createAccumulator() {
            return 0;
        }

        public Integer addInput(Integer num, Integer num2) {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }

        public Integer mergeAccumulators(Iterable<Integer> iterable) {
            return (Integer) Streams.stream(iterable.iterator()).reduce((num, num2) -> {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }).orElseGet(() -> {
                return 0;
            });
        }

        public Integer extractOutput(Integer num) {
            return num;
        }

        /* renamed from: mergeAccumulators, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m15mergeAccumulators(Iterable iterable) {
            return mergeAccumulators((Iterable<Integer>) iterable);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static IntervalWindow intervalWindow(int i, int i2) {
        return new IntervalWindow(at(i), at(i2));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Instant at(int i) {
        return NOW.plus(Duration.standardMinutes(i));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Matcher<Map<IntervalWindow, MutablePair<Instant, Integer>>> equalsToMap(KV<IntervalWindow, MutablePair<Instant, Integer>>... kvArr) {
        return Matchers.equalTo(mapOf(kvArr));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<IntervalWindow, MutablePair<Instant, Integer>> mapOf(KV<IntervalWindow, MutablePair<Instant, Integer>>... kvArr) {
        return (Map) Arrays.asList(kvArr).stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MutablePair<Instant, Integer> pair(Instant instant, int i) {
        return new MutablePair<>(instant, Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <AccT> Aggregator<WindowedValue<Integer>, AccT, Collection<WindowedValue<Integer>>> windowedAgg(WindowFn<?, ?> windowFn) {
        Encoder encoderOf = EncoderHelpers.encoderOf(Integer.class);
        Encoder encoderFor = EncoderHelpers.encoderFor(IntervalWindow.getCoder());
        Encoder windowedValueEncoder = EncoderHelpers.windowedValueEncoder(encoderOf, encoderFor);
        return Aggregators.windowedValue(new SimpleSum(), (v0) -> {
            return v0.getValue();
        }, WindowingStrategy.of(windowFn).withTimestampCombiner(TimestampCombiner.LATEST), encoderFor, encoderOf, windowedValueEncoder);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop$Fun1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/util/WindowedValue") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
