package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.Serializable;
import java.util.ArrayList;
import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTest.class */
public class CombinePerKeyTest implements Serializable {

    @ClassRule
    public static final SparkSessionRule SESSION = new SparkSessionRule(new KV[0]);

    @Rule
    public transient TestPipeline pipeline = TestPipeline.fromOptions(SESSION.createPipelineOptions());

    @Test
    public void testCombinePerKey() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(KV.of(1, 1));
        arrayList.add(KV.of(1, 3));
        arrayList.add(KV.of(1, 5));
        arrayList.add(KV.of(2, 2));
        arrayList.add(KV.of(2, 4));
        arrayList.add(KV.of(2, 6));
        PAssert.that(this.pipeline.apply(Create.of(arrayList)).apply(Sum.integersPerKey())).containsInAnyOrder(new KV[]{KV.of(1, 9), KV.of(2, 12)});
        this.pipeline.run();
    }

    @Test
    public void testDistinctViaCombinePerKey() {
        PAssert.that(this.pipeline.apply(Create.of(Lists.newArrayList(new Integer[]{1, 2, 3, 3, 4, 4, 4, 4, 5, 5}))).apply(Distinct.create())).containsInAnyOrder(new Integer[]{1, 2, 3, 4, 5});
        this.pipeline.run();
    }

    @Test
    public void testCombinePerKeyPreservesWindowing() {
        PAssert.that(this.pipeline.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1L)), new TimestampedValue[]{TimestampedValue.of(KV.of(1, 3), new Instant(2L)), TimestampedValue.of(KV.of(1, 5), new Instant(11L)), TimestampedValue.of(KV.of(2, 2), new Instant(3L)), TimestampedValue.of(KV.of(2, 4), new Instant(11L)), TimestampedValue.of(KV.of(2, 6), new Instant(12L))})).apply(Window.into(FixedWindows.of(Duration.millis(10L)))).apply(Sum.integersPerKey())).containsInAnyOrder(new KV[]{KV.of(1, 4), KV.of(1, 5), KV.of(2, 2), KV.of(2, 10)});
        this.pipeline.run();
    }

    @Test
    public void testCombinePerKeyWithSlidingWindows() {
        PAssert.that(this.pipeline.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(1L)), new TimestampedValue[]{TimestampedValue.of(KV.of(1, 3), new Instant(2L)), TimestampedValue.of(KV.of(1, 5), new Instant(3L)), TimestampedValue.of(KV.of(1, 2), new Instant(1L)), TimestampedValue.of(KV.of(1, 4), new Instant(2L)), TimestampedValue.of(KV.of(1, 6), new Instant(3L))})).apply(Window.into(SlidingWindows.of(Duration.millis(3L)).every(Duration.millis(1L)))).apply(Sum.integersPerKey())).containsInAnyOrder(new KV[]{KV.of(1, 3), KV.of(1, 10), KV.of(1, 21), KV.of(1, 18), KV.of(1, 11)});
        this.pipeline.run();
    }

    @Test
    public void testCombineByKeyWithMergingWindows() {
        PAssert.that(this.pipeline.apply(Create.timestamped(TimestampedValue.of(KV.of(1, 1), new Instant(5L)), new TimestampedValue[]{TimestampedValue.of(KV.of(1, 3), new Instant(7L)), TimestampedValue.of(KV.of(1, 5), new Instant(11L)), TimestampedValue.of(KV.of(2, 2), new Instant(5L)), TimestampedValue.of(KV.of(2, 4), new Instant(11L)), TimestampedValue.of(KV.of(2, 6), new Instant(12L))})).apply(Window.into(Sessions.withGapDuration(Duration.millis(5L)))).apply(Sum.integersPerKey())).containsInAnyOrder(new KV[]{KV.of(1, 9), KV.of(2, 2), KV.of(2, 10)});
        this.pipeline.run();
    }

    @Test
    public void testCountPerElementWithSlidingWindows() {
        PAssert.that(this.pipeline.apply(Create.timestamped(TimestampedValue.of("a", new Instant(1L)), new TimestampedValue[]{TimestampedValue.of("a", new Instant(2L)), TimestampedValue.of("b", new Instant(3L)), TimestampedValue.of("b", new Instant(4L))})).apply(Window.into(SlidingWindows.of(Duration.millis(2L)).every(Duration.millis(1L)))).apply(Count.perElement())).containsInAnyOrder(new KV[]{KV.of("a", 1L), KV.of("a", 2L), KV.of("a", 1L), KV.of("b", 1L), KV.of("b", 2L), KV.of("b", 1L)});
        this.pipeline.run();
    }
}
