package org.apache.beam.runners.spark.stateful;

import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Ascii;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.construction.TriggerTranslation;
import org.apache.beam.runners.core.metrics.CounterCell;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Table;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.PairDStreamFunctions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.math.Ordering;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.class */
public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet$OutputWindowedValueHolder.class */
    public static class OutputWindowedValueHolder<K, V> implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final List<WindowedValue<KV<K, Iterable<V>>>> windowedValues;

        private OutputWindowedValueHolder() {
            this.windowedValues = new ArrayList();
        }

        public void outputWindowedValue(KV<K, Iterable<V>> kv, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            this.windowedValues.add(WindowedValue.of(kv, instant, collection, paneInfo));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<WindowedValue<KV<K, Iterable<V>>>> getWindowedValues() {
            return this.windowedValues;
        }

        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tupleTag, AdditionalOutputT additionaloutputt, Instant instant, Collection<? extends BoundedWindow> collection, PaneInfo paneInfo) {
            throw new UnsupportedOperationException("Tagged outputs are not allowed in GroupAlsoByWindow.");
        }

        public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection, PaneInfo paneInfo) {
            outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection, paneInfo);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet$StateAndTimers.class */
    public static class StateAndTimers implements Serializable {
        private final Table<String, String, byte[]> state;
        private final Collection<byte[]> serTimers;

        private StateAndTimers(Table<String, String, byte[]> table, Collection<byte[]> collection) {
            this.state = table;
            this.serTimers = collection;
        }

        public Table<String, String, byte[]> getState() {
            return this.state;
        }

        Collection<byte[]> getTimers() {
            return this.serTimers;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet$UpdateStateByKeyFunction.class */
    public static class UpdateStateByKeyFunction<K, InputT, W extends BoundedWindow> extends AbstractFunction1<Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>>, Iterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>>> implements Serializable {
        private final WindowedValue.FullWindowedValueCoder<InputT> wvCoder;
        private final Coder<K> keyCoder;
        private final List<Integer> sourceIds;
        private final TimerInternals.TimerDataCoder timerDataCoder;
        private final WindowingStrategy<?, W> windowingStrategy;
        private final SerializablePipelineOptions options;
        private final IterableCoder<WindowedValue<InputT>> itrWvCoder;
        private final String logPrefix;
        private final Coder<WindowedValue<KV<K, Iterable<InputT>>>> wvKvIterCoder;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet$UpdateStateByKeyFunction$UpdateStateByKeyOutputIterator.class */
        public class UpdateStateByKeyOutputIterator extends AbstractIterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> {
            private final Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> input;
            private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn;
            private final CounterCell droppedDueToLateness;

            private SparkStateInternals<K> processPreviousState(Option<Tuple2<StateAndTimers, List<byte[]>>> option, K k, SparkTimerInternals sparkTimerInternals) {
                SparkStateInternals<K> forKeyAndState;
                if (option.isEmpty()) {
                    forKeyAndState = SparkStateInternals.forKey(k);
                } else {
                    StateAndTimers stateAndTimers = (StateAndTimers) ((Tuple2) option.get())._1();
                    forKeyAndState = SparkStateInternals.forKeyAndState(k, stateAndTimers.getState());
                    sparkTimerInternals.addTimers(SparkTimerInternals.deserializeTimers(stateAndTimers.getTimers(), UpdateStateByKeyFunction.this.timerDataCoder));
                }
                return forKeyAndState;
            }

            UpdateStateByKeyOutputIterator(Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> iterator, SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> systemReduceFn, CounterCell counterCell) {
                this.input = iterator;
                this.reduceFn = systemReduceFn;
                this.droppedDueToLateness = counterCell;
            }

            private Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing(Collection<TimerInternals.TimerData> collection, Instant instant) {
                return FluentIterable.from(collection).filter(timerData -> {
                    return !timerData.getDomain().equals(TimeDomain.EVENT_TIME) || instant.isAfter(timerData.getTimestamp());
                }).toSet();
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Multi-variable type inference failed */
            /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
            public Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> m265computeNext() {
                while (this.input.hasNext()) {
                    Tuple3 tuple3 = (Tuple3) this.input.next();
                    ByteArray byteArray = (ByteArray) tuple3._1();
                    Seq seq = (Seq) tuple3._2();
                    Option option = (Option) tuple3._3();
                    Object fromByteArray = CoderHelpers.fromByteArray(byteArray.getValue(), UpdateStateByKeyFunction.this.keyCoder);
                    Map<Integer, GlobalWatermarkHolder.SparkWatermarks> map = GlobalWatermarkHolder.get(SparkGroupAlsoByWindowViaWindowSet.getBatchDuration(UpdateStateByKeyFunction.this.options));
                    SparkTimerInternals forStreamFromSources = SparkTimerInternals.forStreamFromSources(UpdateStateByKeyFunction.this.sourceIds, map);
                    SparkStateInternals processPreviousState = processPreviousState(option, fromByteArray, forStreamFromSources);
                    ExecutableTriggerStateMachine create = ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(UpdateStateByKeyFunction.this.windowingStrategy.getTrigger())));
                    OutputWindowedValueHolder outputWindowedValueHolder = new OutputWindowedValueHolder();
                    ReduceFnRunner reduceFnRunner = new ReduceFnRunner(fromByteArray, UpdateStateByKeyFunction.this.windowingStrategy, create, processPreviousState, forStreamFromSources, outputWindowedValueHolder, new UnsupportedSideInputReader("GroupAlsoByWindow"), this.reduceFn, UpdateStateByKeyFunction.this.options.get());
                    try {
                        if (!seq.isEmpty()) {
                            try {
                                KV kv = (KV) CoderHelpers.fromByteArray((byte[]) seq.head(), KvCoder.of(VarLongCoder.of(), UpdateStateByKeyFunction.this.itrWvCoder));
                                SparkGroupAlsoByWindowViaWindowSet.LOG.debug(UpdateStateByKeyFunction.this.logPrefix + ": processing RDD with timestamp: {}, watermarks: {}", (Long) kv.getKey(), map);
                                Iterable iterable = (Iterable) kv.getValue();
                                SparkGroupAlsoByWindowViaWindowSet.LOG.trace(UpdateStateByKeyFunction.this.logPrefix + ": input elements: {}", iterable);
                                ArrayList newArrayList = Lists.newArrayList(LateDataUtils.dropExpiredWindows(fromByteArray, iterable, forStreamFromSources, UpdateStateByKeyFunction.this.windowingStrategy, this.droppedDueToLateness));
                                SparkGroupAlsoByWindowViaWindowSet.LOG.trace(UpdateStateByKeyFunction.this.logPrefix + ": non expired input elements: {}", newArrayList);
                                reduceFnRunner.processElements(newArrayList);
                            } catch (Exception e) {
                                throw new RuntimeException("Failed to process element with ReduceFnRunner", e);
                            }
                        } else if (processPreviousState.getState().isEmpty()) {
                            continue;
                        }
                        SparkGroupAlsoByWindowViaWindowSet.LOG.debug(UpdateStateByKeyFunction.this.logPrefix + ": timerInternals before advance are {}", forStreamFromSources.toString());
                        forStreamFromSources.advanceWatermark();
                        Collection<TimerInternals.TimerData> filterTimersEligibleForProcessing = filterTimersEligibleForProcessing(forStreamFromSources.getTimers(), forStreamFromSources.currentInputWatermarkTime());
                        SparkGroupAlsoByWindowViaWindowSet.LOG.debug(UpdateStateByKeyFunction.this.logPrefix + ": timers eligible for processing are {}", filterTimersEligibleForProcessing);
                        reduceFnRunner.onTimers(filterTimersEligibleForProcessing);
                        reduceFnRunner.persist();
                        List windowedValues = outputWindowedValueHolder.getWindowedValues();
                        if (!windowedValues.isEmpty() || !processPreviousState.getState().isEmpty()) {
                            StateAndTimers stateAndTimers = new StateAndTimers(processPreviousState.getState(), SparkTimerInternals.serializeTimers(forStreamFromSources.getTimers(), UpdateStateByKeyFunction.this.timerDataCoder));
                            SparkGroupAlsoByWindowViaWindowSet.LOG.trace(UpdateStateByKeyFunction.this.logPrefix + ": output elements are {}", Joiner.on(", ").join(windowedValues));
                            return new Tuple2<>(byteArray, new Tuple2(stateAndTimers, CoderHelpers.toByteArrays(windowedValues, UpdateStateByKeyFunction.this.wvKvIterCoder)));
                        }
                    } catch (Exception e2) {
                        throw new RuntimeException("Failed to process ReduceFnRunner onTimer.", e2);
                    }
                }
                return (Tuple2) endOfData();
            }
        }

        UpdateStateByKeyFunction(List<Integer> list, WindowingStrategy<?, W> windowingStrategy, WindowedValue.FullWindowedValueCoder<InputT> fullWindowedValueCoder, Coder<K> coder, SerializablePipelineOptions serializablePipelineOptions, String str) {
            this.wvCoder = fullWindowedValueCoder;
            this.keyCoder = coder;
            this.sourceIds = list;
            this.timerDataCoder = SparkGroupAlsoByWindowViaWindowSet.timerDataCoderOf(windowingStrategy);
            this.windowingStrategy = windowingStrategy;
            this.options = serializablePipelineOptions;
            this.itrWvCoder = IterableCoder.of(fullWindowedValueCoder);
            this.logPrefix = str;
            this.wvKvIterCoder = SparkGroupAlsoByWindowViaWindowSet.windowedValueKeyValueCoderOf(coder, fullWindowedValueCoder.getValueCoder(), fullWindowedValueCoder.getWindowCoder());
        }

        public Iterator<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> apply(Iterator<Tuple3<ByteArray, Seq<byte[]>, Option<Tuple2<StateAndTimers, List<byte[]>>>>> iterator) {
            SystemReduceFn buffering = SystemReduceFn.buffering(this.wvCoder.getValueCoder());
            MetricsContainerImpl metricsContainerImpl = new MetricsContainerImpl("cellProvider");
            CounterCell counter = metricsContainerImpl.getCounter(MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, "DroppedDueToClosedWindow"));
            CounterCell counter2 = metricsContainerImpl.getCounter(MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, "DroppedDueToLateness"));
            long longValue = counter2.getCumulative().longValue();
            if (longValue > 0) {
                SparkGroupAlsoByWindowViaWindowSet.LOG.info(String.format("Dropped %d elements due to lateness.", Long.valueOf(longValue)));
                counter2.inc(-counter2.getCumulative().longValue());
            }
            long longValue2 = counter.getCumulative().longValue();
            if (longValue2 > 0) {
                SparkGroupAlsoByWindowViaWindowSet.LOG.info(String.format("Dropped %d elements due to closed window.", Long.valueOf(longValue2)));
                counter.inc(-counter.getCumulative().longValue());
            }
            return JavaConversions.asScalaIterator(new UpdateStateByKeyOutputIterator(iterator, buffering, counter2));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, InputT> WindowedValue.FullWindowedValueCoder<KV<K, Iterable<InputT>>> windowedValueKeyValueCoderOf(Coder<K> coder, Coder<InputT> coder2, Coder<? extends BoundedWindow> coder3) {
        return WindowedValue.FullWindowedValueCoder.of(KvCoder.of(coder, IterableCoder.of(coder2)), coder3);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <W extends BoundedWindow> TimerInternals.TimerDataCoder timerDataCoderOf(WindowingStrategy<?, W> windowingStrategy) {
        return TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
    }

    private static void checkpointIfNeeded(DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> dStream, SerializablePipelineOptions serializablePipelineOptions) {
        Long batchDuration = getBatchDuration(serializablePipelineOptions);
        if (batchDuration.longValue() > 0) {
            dStream.checkpoint(new Duration(batchDuration.longValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Long getBatchDuration(SerializablePipelineOptions serializablePipelineOptions) {
        return ((SparkPipelineOptions) serializablePipelineOptions.get().as(SparkPipelineOptions.class)).getCheckpointDurationMillis();
    }

    private static <K, InputT> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> stripStateValues(DStream<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>> dStream, final Coder<K> coder, final WindowedValue.FullWindowedValueCoder<InputT> fullWindowedValueCoder) {
        return JavaPairDStream.fromPairDStream(dStream, JavaSparkContext$.MODULE$.fakeClassTag(), JavaSparkContext$.MODULE$.fakeClassTag()).filter(tuple2 -> {
            return Boolean.valueOf(!((List) ((Tuple2) tuple2._2())._2()).isEmpty());
        }).flatMap(new FlatMapFunction<Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>>, WindowedValue<KV<K, Iterable<InputT>>>>() { // from class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet.1
            private final WindowedValue.FullWindowedValueCoder<KV<K, Iterable<InputT>>> windowedValueKeyValueCoder;

            {
                this.windowedValueKeyValueCoder = SparkGroupAlsoByWindowViaWindowSet.windowedValueKeyValueCoderOf(coder, fullWindowedValueCoder.getValueCoder(), fullWindowedValueCoder.getWindowCoder());
            }

            public java.util.Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call(Tuple2<ByteArray, Tuple2<StateAndTimers, List<byte[]>>> tuple22) throws Exception {
                return CoderHelpers.fromByteArrays((Collection) ((Tuple2) tuple22._2())._2(), this.windowedValueKeyValueCoder).iterator();
            }
        });
    }

    private static <K, InputT> PairDStreamFunctions<ByteArray, byte[]> buildPairDStream(JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> javaDStream, Coder<K> coder, Coder<WindowedValue<InputT>> coder2) {
        return DStream.toPairDStreamFunctions(javaDStream.transformToPair((javaRDD, time) -> {
            return javaRDD.mapPartitions(TranslationUtils.functionToFlatMapFunction((v0) -> {
                return v0.getValue();
            }), true).mapPartitionsToPair(TranslationUtils.toPairFlatMapFunction(), true).mapValues(iterable -> {
                return KV.of(Long.valueOf(time.milliseconds()), iterable);
            }).mapPartitionsToPair(TranslationUtils.pairFunctionToPairFlatMapFunction(CoderHelpers.toByteFunction(coder, KvCoder.of(VarLongCoder.of(), IterableCoder.of(coder2)))), true);
        }).dstream(), JavaSparkContext$.MODULE$.fakeClassTag(), JavaSparkContext$.MODULE$.fakeClassTag(), (Ordering) null);
    }

    public static <K, InputT, W extends BoundedWindow> JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> javaDStream, Coder<K> coder, Coder<WindowedValue<InputT>> coder2, WindowingStrategy<?, W> windowingStrategy, SerializablePipelineOptions serializablePipelineOptions, List<Integer> list, String str) {
        PairDStreamFunctions<ByteArray, byte[]> buildPairDStream = buildPairDStream(javaDStream, coder, coder2);
        DStream updateStateByKey = buildPairDStream.updateStateByKey(new UpdateStateByKeyFunction(list, windowingStrategy, (WindowedValue.FullWindowedValueCoder) coder2, coder, serializablePipelineOptions, str), buildPairDStream.defaultPartitioner(buildPairDStream.defaultPartitioner$default$1()), true, JavaSparkContext$.MODULE$.fakeClassTag());
        checkpointIfNeeded(updateStateByKey, serializablePipelineOptions);
        return stripStateValues(updateStateByKey, coder, (WindowedValue.FullWindowedValueCoder) coder2);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2040604702:
                if (implMethodName.equals("lambda$buildPairDStream$424b4112$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1155522431:
                if (implMethodName.equals("lambda$buildPairDStream$490a17d1$1")) {
                    z = true;
                    break;
                }
                break;
            case 1934365300:
                if (implMethodName.equals("lambda$stripStateValues$cc3a2dfa$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/util/WindowedValue") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
            case Ascii.SOH /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/streaming/Time;Ljava/lang/Iterable;)Lorg/apache/beam/sdk/values/KV;")) {
                    Time time = (Time) serializedLambda.getCapturedArg(0);
                    return iterable -> {
                        return KV.of(Long.valueOf(time.milliseconds()), iterable);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Ljava/lang/Boolean;")) {
                    return tuple2 -> {
                        return Boolean.valueOf(!((List) ((Tuple2) tuple2._2())._2()).isEmpty());
                    };
                }
                break;
            case Ascii.ETX /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/spark/api/java/JavaRDD;Lorg/apache/spark/streaming/Time;)Lorg/apache/spark/api/java/JavaPairRDD;")) {
                    Coder coder = (Coder) serializedLambda.getCapturedArg(0);
                    Coder coder2 = (Coder) serializedLambda.getCapturedArg(1);
                    return (javaRDD, time2) -> {
                        return javaRDD.mapPartitions(TranslationUtils.functionToFlatMapFunction((v0) -> {
                            return v0.getValue();
                        }), true).mapPartitionsToPair(TranslationUtils.toPairFlatMapFunction(), true).mapValues(iterable2 -> {
                            return KV.of(Long.valueOf(time2.milliseconds()), iterable2);
                        }).mapPartitionsToPair(TranslationUtils.pairFunctionToPairFlatMapFunction(CoderHelpers.toByteFunction(coder, KvCoder.of(VarLongCoder.of(), IterableCoder.of(coder2)))), true);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
