package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.joda.time.Instant;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.ExternalResource;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValuesTest.class */
public class SideInputValuesTest {

    @ClassRule
    public static final SparkSessionRule SESSION = new SparkSessionRule(new KV[0]);

    @ClassRule
    public static final SparkKryo KRYO = new SparkKryo();

    /* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SideInputValuesTest$SparkKryo.class */
    public static class SparkKryo extends ExternalResource {

        @Nullable
        private Kryo kryo = null;

        protected void after() {
            this.kryo = null;
        }

        <T> T serde(T t) {
            Output output = new Output(128);
            kryo().writeClassAndObject(output, t);
            return (T) kryo().readClassAndObject(new Input(output.getBuffer(), 0, output.position()));
        }

        Kryo kryo() {
            if (this.kryo == null) {
                this.kryo = new KryoSerializer(SideInputValuesTest.SESSION.getSession().sparkContext().conf()).newKryo();
            }
            return (Kryo) Preconditions.checkStateNotNull(this.kryo);
        }
    }

    @Test
    public void globalSideInputValues() {
        SideInputValues.Global global = new SideInputValues.Global("test", StringUtf8Coder.of(), dataset(EncoderHelpers.windowedValueEncoder(EncoderHelpers.encoderOf(String.class), EncoderHelpers.encoderOf(GlobalWindow.class)), WindowedValue.valueInGlobalWindow("a"), WindowedValue.valueInGlobalWindow("b")));
        Assertions.assertThat(global.get(GlobalWindow.INSTANCE)).isEqualTo(ImmutableList.of("a", "b"));
        SideInputValues sideInputValues = (SideInputValues) KRYO.serde(global);
        Assertions.assertThat(sideInputValues).isEqualToIgnoringGivenFields(global, new String[]{"binaryValues"});
        Assertions.assertThat(sideInputValues.get(GlobalWindow.INSTANCE)).isEqualTo(ImmutableList.of("a", "b"));
    }

    @Test
    public void windowedSideInputValues() {
        SideInputValues.ByWindow byWindow = new SideInputValues.ByWindow("test", WindowedValue.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder()), dataset(EncoderHelpers.windowedValueEncoder(EncoderHelpers.encoderOf(String.class), EncoderHelpers.encoderOf(IntervalWindow.class)), valueInWindows("a", intervalWindow(0, 1), intervalWindow(1, 2)), valueInWindows("b", intervalWindow(1, 2), intervalWindow(2, 3))));
        Assertions.assertThat(byWindow.get(intervalWindow(0, 1))).isEqualTo(ImmutableList.of("a"));
        Assertions.assertThat(byWindow.get(intervalWindow(1, 2))).isEqualTo(ImmutableList.of("a", "b"));
        Assertions.assertThat(byWindow.get(intervalWindow(2, 3))).isEqualTo(ImmutableList.of("b"));
        SideInputValues sideInputValues = (SideInputValues) KRYO.serde(byWindow);
        Assertions.assertThat(sideInputValues).isEqualToIgnoringGivenFields(byWindow, new String[]{"binaryValues"});
        Assertions.assertThat(sideInputValues.get(intervalWindow(0, 1))).isEqualTo(ImmutableList.of("a"));
        Assertions.assertThat(sideInputValues.get(intervalWindow(1, 2))).isEqualTo(ImmutableList.of("a", "b"));
        Assertions.assertThat(sideInputValues.get(intervalWindow(2, 3))).isEqualTo(ImmutableList.of("b"));
    }

    private static <T> Dataset<T> dataset(Encoder<T> encoder, T... tArr) {
        return SESSION.getSession().createDataset(ScalaInterop.seqOf(tArr), encoder);
    }

    private static IntervalWindow intervalWindow(int i, int i2) {
        return new IntervalWindow(Instant.ofEpochMilli(i), Instant.ofEpochMilli(i2));
    }

    private static <T> WindowedValue<T> valueInWindows(T t, BoundedWindow... boundedWindowArr) {
        return WindowedValue.of(t, Instant.EPOCH, Lists.list(boundedWindowArr), PaneInfo.NO_FIRING);
    }
}
