package org.apache.beam.runners.spark.translation;

import java.util.Arrays;
import java.util.Iterator;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctionsTest.class */
public class GroupNonMergingWindowsFunctionsTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/GroupNonMergingWindowsFunctionsTest$ItemFactory.class */
    public static class ItemFactory<K, V, W extends BoundedWindow> {
        private final Coder<K> keyCoder;
        private final WindowedValue.FullWindowedValueCoder<KV<K, V>> winValCoder;
        private final byte[] windowBytes;
        private final W window;

        static <K, V> ItemFactory<K, V, GlobalWindow> forGlogalWindow(Coder<K> coder, WindowedValue.FullWindowedValueCoder<KV<K, V>> fullWindowedValueCoder) {
            return new ItemFactory<>(coder, fullWindowedValueCoder, GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
        }

        static <K, V, W extends BoundedWindow> ItemFactory<K, V, W> forWindow(Coder<K> coder, WindowedValue.FullWindowedValueCoder<KV<K, V>> fullWindowedValueCoder, Coder<W> coder2, W w) {
            return new ItemFactory<>(coder, fullWindowedValueCoder, coder2, w);
        }

        ItemFactory(Coder<K> coder, WindowedValue.FullWindowedValueCoder<KV<K, V>> fullWindowedValueCoder, Coder<W> coder2, W w) {
            this.keyCoder = coder;
            this.winValCoder = fullWindowedValueCoder;
            this.windowBytes = CoderHelpers.toByteArray(w, coder2);
            this.window = w;
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Type inference failed for: r2v1, types: [byte[], byte[][]] */
        public Tuple2<ByteArray, byte[]> create(K k, V v) {
            return new Tuple2<>(new ByteArray(Bytes.concat((byte[][]) new byte[]{CoderHelpers.toByteArray(k, this.keyCoder), this.windowBytes})), CoderHelpers.toByteArray(WindowedValue.of(KV.of(k, v), Instant.now(), this.window, PaneInfo.ON_TIME_AND_ONLY_FIRING), this.winValCoder));
        }
    }

    @Test
    public void testGroupByKeyIterator() throws Coder.NonDeterministicException {
        GroupNonMergingWindowsFunctions.GroupByKeyIterator<String, Integer, GlobalWindow> createGbkIterator = createGbkIterator();
        Assert.assertTrue(createGbkIterator.hasNext());
        WindowedValue next = createGbkIterator.next();
        Assert.assertTrue(createGbkIterator.hasNext());
        Assert.assertTrue(createGbkIterator.hasNext());
        Iterator it = ((Iterable) ((KV) next.getValue()).getValue()).iterator();
        Assert.assertTrue("Now we expect first value for K1 to pop up.", it.hasNext());
        Assert.assertEquals(1L, ((Integer) it.next()).longValue());
        Assert.assertTrue(it.hasNext());
        Assert.assertTrue(it.hasNext());
        Assert.assertEquals(2L, ((Integer) it.next()).longValue());
        Assert.assertEquals(3L, ((Integer) ((Iterable) ((KV) createGbkIterator.next().getValue()).getValue()).iterator().next()).longValue());
    }

    @Test
    public void testGroupByKeyIteratorOnNonGlobalWindows() throws Coder.NonDeterministicException {
        Instant now = Instant.now();
        GroupNonMergingWindowsFunctions.GroupByKeyIterator createGbkIterator = createGbkIterator(new IntervalWindow(now, now.plus(1L)), IntervalWindow.getCoder(), WindowingStrategy.of(FixedWindows.of(Duration.millis(1L))));
        Assert.assertTrue(createGbkIterator.hasNext());
        WindowedValue next = createGbkIterator.next();
        Assert.assertTrue(createGbkIterator.hasNext());
        Assert.assertTrue(createGbkIterator.hasNext());
        Iterator it = ((Iterable) ((KV) next.getValue()).getValue()).iterator();
        Assert.assertTrue("Now we expect first value for K1 to pop up.", it.hasNext());
        Assert.assertEquals(1L, ((Integer) it.next()).longValue());
        Assert.assertTrue(it.hasNext());
        Assert.assertTrue(it.hasNext());
        Assert.assertEquals(2L, ((Integer) it.next()).longValue());
        Assert.assertEquals(3L, ((Integer) ((Iterable) ((KV) createGbkIterator.next().getValue()).getValue()).iterator().next()).longValue());
    }

    @Test(expected = IllegalStateException.class)
    public void testGbkIteratorValuesCannotBeReiterated() throws Coder.NonDeterministicException {
        Iterable<Integer> iterable = (Iterable) ((KV) createGbkIterator().next().getValue()).getValue();
        for (Integer num : iterable) {
        }
        for (Integer num2 : iterable) {
        }
    }

    private GroupNonMergingWindowsFunctions.GroupByKeyIterator<String, Integer, GlobalWindow> createGbkIterator() throws Coder.NonDeterministicException {
        return createGbkIterator(GlobalWindow.INSTANCE, GlobalWindow.Coder.INSTANCE, WindowingStrategy.globalDefault());
    }

    private <W extends BoundedWindow> GroupNonMergingWindowsFunctions.GroupByKeyIterator<String, Integer, W> createGbkIterator(W w, Coder<W> coder, WindowingStrategy<Object, W> windowingStrategy) throws Coder.NonDeterministicException {
        StringUtf8Coder of = StringUtf8Coder.of();
        WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), windowingStrategy.getWindowFn().windowCoder());
        ItemFactory forWindow = ItemFactory.forWindow(of, fullCoder, coder, w);
        return new GroupNonMergingWindowsFunctions.GroupByKeyIterator<>(Arrays.asList(forWindow.create("k1", 1), forWindow.create("k1", 2), forWindow.create("k2", 3), forWindow.create("k2", 4), forWindow.create("k2", 5)).iterator(), of, windowingStrategy, fullCoder);
    }
}
