package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.structuredstreaming.translation.EvaluationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.CoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import scala.Function1;

@Internal
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValues.class */
public interface SideInputValues<T> extends Serializable, KryoSerializable {

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValues$BaseSideInputValues.class */
    public static abstract class BaseSideInputValues<BinaryT, ValuesT, T> implements SideInputValues<T> {
        private Coder<BinaryT> coder;

        @Nullable
        private byte[][] binaryValues;
        private transient ValuesT values;

        private BaseSideInputValues(Coder<BinaryT> coder, @Nullable byte[][] bArr) {
            this.values = null;
            this.coder = coder;
            this.binaryValues = bArr;
        }

        abstract ValuesT deserialize(byte[][] bArr, Coder<BinaryT> coder);

        final ValuesT getValues() {
            if (this.values == null) {
                this.values = deserialize((byte[][]) Preconditions.checkStateNotNull(this.binaryValues), this.coder);
            }
            return this.values;
        }

        public void write(Kryo kryo, Output output) {
            kryo.writeClassAndObject(output, this.coder);
            kryo.writeObject(output, Preconditions.checkStateNotNull(this.binaryValues));
        }

        public void read(Kryo kryo, Input input) {
            this.coder = (Coder) kryo.readClassAndObject(input);
            this.values = deserialize((byte[][]) Preconditions.checkStateNotNull((byte[][]) kryo.readObject(input, byte[][].class)), this.coder);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValues$ByWindow.class */
    public static class ByWindow<T> extends BaseSideInputValues<WindowedValue<T>, Map<BoundedWindow, List<T>>, T> {
        @VisibleForTesting
        ByWindow(String str, Coder<WindowedValue<T>> coder, Dataset<WindowedValue<T>> dataset) {
            super(coder, (byte[][]) EvaluationContext.collect(str, binaryDataset(dataset, coder)));
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues
        @Nullable
        public List<T> get(BoundedWindow boundedWindow) {
            return getValues().get(boundedWindow);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues.BaseSideInputValues
        public Map<BoundedWindow, List<T>> deserialize(byte[][] bArr, Coder<WindowedValue<T>> coder) {
            HashMap hashMap = new HashMap();
            for (byte[] bArr2 : bArr) {
                WindowedValue windowedValue = (WindowedValue) CoderHelpers.fromByteArray(bArr2, coder);
                Iterator it = windowedValue.getWindows().iterator();
                while (it.hasNext()) {
                    ((List) hashMap.computeIfAbsent((BoundedWindow) it.next(), boundedWindow -> {
                        return new ArrayList();
                    })).add(windowedValue.getValue());
                }
            }
            return hashMap;
        }

        private static <T> Dataset<byte[]> binaryDataset(Dataset<WindowedValue<T>> dataset, Coder<WindowedValue<T>> coder) {
            return dataset.map(bytes(coder), Encoders.BINARY());
        }

        private static <T> Function1<WindowedValue<T>, byte[]> bytes(Coder<WindowedValue<T>> coder) {
            return ScalaInterop.fun1(windowedValue -> {
                return CoderHelpers.toByteArray(windowedValue, coder);
            });
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1918847035:
                    if (implMethodName.equals("lambda$bytes$b3dc01f7$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop$Fun1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValues$ByWindow") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/util/WindowedValue;)[B")) {
                        Coder coder = (Coder) serializedLambda.getCapturedArg(0);
                        return windowedValue -> {
                            return CoderHelpers.toByteArray(windowedValue, coder);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValues$Global.class */
    public static class Global<T> extends BaseSideInputValues<T, List<T>, T> {
        @VisibleForTesting
        Global(String str, Coder<T> coder, Dataset<WindowedValue<T>> dataset) {
            super(coder, (byte[][]) EvaluationContext.collect(str, binaryDataset(dataset, coder)));
        }

        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues
        @Nullable
        public List<T> get(BoundedWindow boundedWindow) {
            org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(boundedWindow instanceof GlobalWindow, "Expected GlobalWindow");
            return getValues();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues.BaseSideInputValues
        public List<T> deserialize(byte[][] bArr, Coder<T> coder) {
            ArrayList arrayList = new ArrayList(bArr.length);
            for (byte[] bArr2 : bArr) {
                arrayList.add(CoderHelpers.fromByteArray(bArr2, coder));
            }
            return arrayList;
        }

        private static <T> Dataset<byte[]> binaryDataset(Dataset<WindowedValue<T>> dataset, Coder<T> coder) {
            return dataset.map(bytes(coder), Encoders.BINARY());
        }

        private static <T> Function1<WindowedValue<T>, byte[]> bytes(Coder<T> coder) {
            return ScalaInterop.fun1(windowedValue -> {
                return CoderHelpers.toByteArray(windowedValue.getValue(), coder);
            });
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1451718947:
                    if (implMethodName.equals("lambda$bytes$709ca657$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop$Fun1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValues$Global") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/coders/Coder;Lorg/apache/beam/sdk/util/WindowedValue;)[B")) {
                        Coder coder = (Coder) serializedLambda.getCapturedArg(0);
                        return windowedValue -> {
                            return CoderHelpers.toByteArray(windowedValue.getValue(), coder);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValues$Loader.class */
    public interface Loader<T> extends Function<Dataset<WindowedValue<T>>, SideInputValues<T>> {
    }

    @Nullable
    List<T> get(BoundedWindow boundedWindow);

    static <T> Loader<T> loader(PCollection<T> pCollection) {
        WindowFn windowFn = pCollection.getWindowingStrategy().getWindowFn();
        return windowFn instanceof GlobalWindows ? dataset -> {
            return new Global(pCollection.getName(), pCollection.getCoder(), dataset);
        } : dataset2 -> {
            return new ByWindow(pCollection.getName(), WindowedValue.getFullCoder(pCollection.getCoder(), windowFn.windowCoder()), dataset2);
        };
    }
}
