/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms.join;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.Iterables;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.hamcrest.Matcher;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class CoGroupByKeyTest
implements Serializable {
    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    private PCollection<KV<Integer, String>> createInput(String name, Pipeline p, List<KV<Integer, String>> list) {
        return this.createInput(name, p, list, new ArrayList<Long>());
    }

    private PCollection<KV<Integer, String>> createInput(String name, Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) {
        PCollection input = timestamps.isEmpty() ? (PCollection)p.apply("Create" + name, (PTransform)Create.of(list).withCoder((Coder)KvCoder.of((Coder)BigEndianIntegerCoder.of(), (Coder)StringUtf8Coder.of()))) : (PCollection)p.apply("Create" + name, (PTransform)Create.timestamped(list, timestamps).withCoder((Coder)KvCoder.of((Coder)BigEndianIntegerCoder.of(), (Coder)StringUtf8Coder.of())));
        return (PCollection)input.apply("Identity" + name, (PTransform)ParDo.of((DoFn)new DoFn<KV<Integer, String>, KV<Integer, String>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                c.output((Object)((KV)c.element()));
            }
        }));
    }

    private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk(Pipeline p, TupleTag<String> tag1, TupleTag<String> tag2) {
        List<KV<Integer, String>> list1 = Arrays.asList(KV.of((Object)1, (Object)"collection1-1"), KV.of((Object)2, (Object)"collection1-2"));
        List<KV<Integer, String>> list2 = Arrays.asList(KV.of((Object)2, (Object)"collection2-2"), KV.of((Object)3, (Object)"collection2-3"));
        PCollection<KV<Integer, String>> collection1 = this.createInput("CreateList1", p, list1);
        PCollection<KV<Integer, String>> collection2 = this.createInput("CreateList2", p, list2);
        PCollection coGbkResults = (PCollection)KeyedPCollectionTuple.of(tag1, collection1).and(tag2, collection2).apply((PTransform)CoGroupByKey.create());
        return coGbkResults;
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testCoGroupByKeyGetOnly() {
        TupleTag tag1 = new TupleTag();
        TupleTag tag2 = new TupleTag();
        PCollection<KV<Integer, CoGbkResult>> coGbkResults = this.buildGetOnlyGbk((Pipeline)this.p, (TupleTag<String>)tag1, (TupleTag<String>)tag2);
        PAssert.thatMap(coGbkResults).satisfies((SerializableFunction & Serializable)results -> {
            Assert.assertEquals((Object)"collection1-1", (Object)((CoGbkResult)results.get(1)).getOnly(tag1));
            Assert.assertEquals((Object)"collection1-2", (Object)((CoGbkResult)results.get(2)).getOnly(tag1));
            Assert.assertEquals((Object)"collection2-2", (Object)((CoGbkResult)results.get(2)).getOnly(tag2));
            Assert.assertEquals((Object)"collection2-3", (Object)((CoGbkResult)results.get(3)).getOnly(tag2));
            return null;
        });
        this.p.run();
    }

    private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk(Pipeline p, TupleTag<String> purchasesTag, TupleTag<String> addressesTag, TupleTag<String> namesTag) {
        List<KV<Integer, String>> idToPurchases = Arrays.asList(KV.of((Object)2, (Object)"Boat"), KV.of((Object)1, (Object)"Shoes"), KV.of((Object)3, (Object)"Car"), KV.of((Object)1, (Object)"Book"), KV.of((Object)10, (Object)"Pens"), KV.of((Object)8, (Object)"House"), KV.of((Object)4, (Object)"Suit"), KV.of((Object)11, (Object)"House"), KV.of((Object)14, (Object)"Shoes"), KV.of((Object)2, (Object)"Suit"), KV.of((Object)8, (Object)"Suit Case"), KV.of((Object)3, (Object)"House"));
        List<KV<Integer, String>> idToAddress = Arrays.asList(KV.of((Object)2, (Object)"53 S. 3rd"), KV.of((Object)10, (Object)"383 Jackson Street"), KV.of((Object)20, (Object)"3 W. Arizona"), KV.of((Object)3, (Object)"29 School Rd"), KV.of((Object)8, (Object)"6 Watling Rd"));
        List<KV<Integer, String>> idToName = Arrays.asList(KV.of((Object)1, (Object)"John Smith"), KV.of((Object)2, (Object)"Sally James"), KV.of((Object)8, (Object)"Jeffery Spalding"), KV.of((Object)20, (Object)"Joan Lichtfield"));
        PCollection<KV<Integer, String>> purchasesTable = this.createInput("CreateIdToPurchases", p, idToPurchases);
        PCollection<KV<Integer, String>> addressTable = this.createInput("CreateIdToAddress", p, idToAddress);
        PCollection<KV<Integer, String>> nameTable = this.createInput("CreateIdToName", p, idToName);
        PCollection coGbkResults = (PCollection)KeyedPCollectionTuple.of(namesTag, nameTable).and(addressesTag, addressTable).and(purchasesTag, purchasesTable).apply((PTransform)CoGroupByKey.create());
        return coGbkResults;
    }

    private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing(Pipeline p, TupleTag<String> clicksTag, TupleTag<String> purchasesTag) {
        List<KV<Integer, String>> idToClick = Arrays.asList(KV.of((Object)1, (Object)"Click t0"), KV.of((Object)2, (Object)"Click t2"), KV.of((Object)1, (Object)"Click t4"), KV.of((Object)1, (Object)"Click t6"), KV.of((Object)2, (Object)"Click t8"));
        List<KV<Integer, String>> idToPurchases = Arrays.asList(KV.of((Object)1, (Object)"Boat t1"), KV.of((Object)1, (Object)"Shoesi t2"), KV.of((Object)1, (Object)"Pens t3"), KV.of((Object)2, (Object)"House t4"), KV.of((Object)2, (Object)"Suit t5"), KV.of((Object)1, (Object)"Car t6"), KV.of((Object)1, (Object)"Book t7"), KV.of((Object)2, (Object)"House t8"), KV.of((Object)2, (Object)"Shoes t9"), KV.of((Object)2, (Object)"House t10"));
        PCollection clicksTable = (PCollection)this.createInput("CreateClicks", p, idToClick, Arrays.asList(0L, 2L, 4L, 6L, 8L)).apply("WindowClicks", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)new Duration(4L))).withTimestampCombiner(TimestampCombiner.EARLIEST));
        PCollection purchasesTable = (PCollection)this.createInput("CreatePurchases", p, idToPurchases, Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)).apply("WindowPurchases", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)new Duration(4L))).withTimestampCombiner(TimestampCombiner.EARLIEST));
        PCollection coGbkResults = (PCollection)KeyedPCollectionTuple.of(clicksTag, (PCollection)clicksTable).and(purchasesTag, purchasesTable).apply((PTransform)CoGroupByKey.create());
        return coGbkResults;
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testCoGroupByKey() {
        TupleTag namesTag = new TupleTag();
        TupleTag addressesTag = new TupleTag();
        TupleTag purchasesTag = new TupleTag();
        PCollection<KV<Integer, CoGbkResult>> coGbkResults = this.buildPurchasesCoGbk((Pipeline)this.p, (TupleTag<String>)purchasesTag, (TupleTag<String>)addressesTag, (TupleTag<String>)namesTag);
        PAssert.thatMap(coGbkResults).satisfies((SerializableFunction & Serializable)results -> {
            CoGbkResult result1 = (CoGbkResult)results.get(1);
            Assert.assertEquals((Object)"John Smith", (Object)result1.getOnly(namesTag));
            Assert.assertThat((Object)result1.getAll(purchasesTag), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"Shoes", "Book"}));
            CoGbkResult result2 = (CoGbkResult)results.get(2);
            Assert.assertEquals((Object)"Sally James", (Object)result2.getOnly(namesTag));
            Assert.assertEquals((Object)"53 S. 3rd", (Object)result2.getOnly(addressesTag));
            Assert.assertThat((Object)result2.getAll(purchasesTag), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"Suit", "Boat"}));
            CoGbkResult result3 = (CoGbkResult)results.get(3);
            Assert.assertEquals((String)"29 School Rd", (Object)"29 School Rd", (Object)result3.getOnly(addressesTag));
            Assert.assertThat((Object)result3.getAll(purchasesTag), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"Car", "House"}));
            CoGbkResult result8 = (CoGbkResult)results.get(8);
            Assert.assertEquals((Object)"Jeffery Spalding", (Object)result8.getOnly(namesTag));
            Assert.assertEquals((Object)"6 Watling Rd", (Object)result8.getOnly(addressesTag));
            Assert.assertThat((Object)result8.getAll(purchasesTag), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"House", "Suit Case"}));
            CoGbkResult result20 = (CoGbkResult)results.get(20);
            Assert.assertEquals((Object)"Joan Lichtfield", (Object)result20.getOnly(namesTag));
            Assert.assertEquals((Object)"3 W. Arizona", (Object)result20.getOnly(addressesTag));
            Assert.assertEquals((Object)"383 Jackson Street", (Object)((CoGbkResult)results.get(10)).getOnly(addressesTag));
            Assert.assertThat((Object)((CoGbkResult)results.get(4)).getAll(purchasesTag), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"Suit"}));
            Assert.assertThat((Object)((CoGbkResult)results.get(10)).getAll(purchasesTag), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"Pens"}));
            Assert.assertThat((Object)((CoGbkResult)results.get(11)).getAll(purchasesTag), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"House"}));
            Assert.assertThat((Object)((CoGbkResult)results.get(14)).getAll(purchasesTag), (Matcher)IsIterableContainingInAnyOrder.containsInAnyOrder((Object[])new String[]{"Shoes"}));
            return null;
        });
        this.p.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testConsumingDoFn() throws Exception {
        TupleTag purchasesTag = new TupleTag();
        TupleTag addressesTag = new TupleTag();
        TupleTag namesTag = new TupleTag();
        CoGbkResult result1 = CoGbkResult.of((TupleTag)purchasesTag, Arrays.asList("3a", "3b")).and(addressesTag, Arrays.asList("2a", "2b")).and(namesTag, Arrays.asList("1a"));
        CoGbkResult result2 = CoGbkResult.of((TupleTag)purchasesTag, Arrays.asList("5a", "5b")).and(addressesTag, Arrays.asList("4a")).and(namesTag, new ArrayList());
        CoGbkResult result3 = CoGbkResult.of((TupleTag)purchasesTag, Arrays.asList("7a", "7b")).and(addressesTag, new ArrayList()).and(namesTag, new ArrayList());
        CoGbkResult result4 = CoGbkResult.of((TupleTag)purchasesTag, new ArrayList()).and(addressesTag, Arrays.asList("8a")).and(namesTag, new ArrayList());
        KvCoder coder = KvCoder.of((Coder)VarIntCoder.of(), (Coder)CoGbkResult.CoGbkResultCoder.of((CoGbkResultSchema)CoGbkResultSchema.of(ImmutableList.of(purchasesTag, addressesTag, namesTag)), (UnionCoder)UnionCoder.of(ImmutableList.of(StringUtf8Coder.of(), StringUtf8Coder.of(), StringUtf8Coder.of()))));
        PCollection results = (PCollection)((PCollection)this.p.apply((PTransform)Create.of((Object)KV.of((Object)1, (Object)result1), (Object[])new KV[]{KV.of((Object)2, (Object)result2), KV.of((Object)3, (Object)result3), KV.of((Object)4, (Object)result4)}).withCoder((Coder)coder))).apply((PTransform)ParDo.of((DoFn)new CorrelatePurchaseCountForAddressesWithoutNamesFn(purchasesTag, addressesTag, namesTag)));
        PAssert.that((PCollection)results).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"4a", (Object)2), KV.of((Object)"8a", (Object)0)});
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testCoGroupByKeyHandleResults() {
        TupleTag namesTag = new TupleTag();
        TupleTag addressesTag = new TupleTag();
        TupleTag purchasesTag = new TupleTag();
        PCollection<KV<Integer, CoGbkResult>> coGbkResults = this.buildPurchasesCoGbk((Pipeline)this.p, (TupleTag<String>)purchasesTag, (TupleTag<String>)addressesTag, (TupleTag<String>)namesTag);
        PCollection purchaseCountByKnownAddressesWithoutKnownNames = (PCollection)coGbkResults.apply((PTransform)ParDo.of((DoFn)new CorrelatePurchaseCountForAddressesWithoutNamesFn(purchasesTag, addressesTag, namesTag)));
        PAssert.that((PCollection)purchaseCountByKnownAddressesWithoutKnownNames).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"29 School Rd", (Object)2), KV.of((Object)"383 Jackson Street", (Object)1)});
        this.p.run();
    }

    @Test
    @Category(value={ValidatesRunner.class})
    public void testCoGroupByKeyWithWindowing() {
        TupleTag clicksTag = new TupleTag();
        TupleTag purchasesTag = new TupleTag();
        PCollection<KV<Integer, CoGbkResult>> coGbkResults = this.buildPurchasesCoGbkWithWindowing((Pipeline)this.p, (TupleTag<String>)clicksTag, (TupleTag<String>)purchasesTag);
        PCollection clickOfPurchase = (PCollection)coGbkResults.apply((PTransform)ParDo.of((DoFn)new ClickOfPurchaseFn(clicksTag, purchasesTag)));
        PAssert.that((PCollection)clickOfPurchase).containsInAnyOrder((Object[])new KV[]{KV.of((Object)"Click t0:Boat t1", (Object)"0:3"), KV.of((Object)"Click t0:Shoesi t2", (Object)"0:3"), KV.of((Object)"Click t0:Pens t3", (Object)"0:3"), KV.of((Object)"Click t4:Car t6", (Object)"4:7"), KV.of((Object)"Click t4:Book t7", (Object)"4:7"), KV.of((Object)"Click t6:Car t6", (Object)"4:7"), KV.of((Object)"Click t6:Book t7", (Object)"4:7"), KV.of((Object)"Click t8:House t8", (Object)"8:11"), KV.of((Object)"Click t8:Shoes t9", (Object)"8:11"), KV.of((Object)"Click t8:House t10", (Object)"8:11")});
        this.p.run();
    }

    private static class CorrelatePurchaseCountForAddressesWithoutNamesFn
    extends DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
        private final TupleTag<String> purchasesTag;
        private final TupleTag<String> addressesTag;
        private final TupleTag<String> namesTag;

        private CorrelatePurchaseCountForAddressesWithoutNamesFn(TupleTag<String> purchasesTag, TupleTag<String> addressesTag, TupleTag<String> namesTag) {
            this.purchasesTag = purchasesTag;
            this.addressesTag = addressesTag;
            this.namesTag = namesTag;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            KV e = (KV)c.element();
            CoGbkResult row = (CoGbkResult)e.getValue();
            Iterable names = row.getAll(this.namesTag);
            if (names.iterator().hasNext()) {
                return;
            }
            Iterable addresses = row.getAll(this.addressesTag);
            if (!addresses.iterator().hasNext()) {
                return;
            }
            ArrayList<String> addressList = new ArrayList<String>();
            for (String address : addresses) {
                addressList.add(address);
            }
            Iterable purchases = row.getAll(this.purchasesTag);
            int purchaseCount = Iterables.size(purchases);
            for (String address : addressList) {
                c.output((Object)KV.of((Object)address, (Object)purchaseCount));
            }
        }
    }

    private static class ClickOfPurchaseFn
    extends DoFn<KV<Integer, CoGbkResult>, KV<String, String>> {
        private final TupleTag<String> clicksTag;
        private final TupleTag<String> purchasesTag;

        private ClickOfPurchaseFn(TupleTag<String> clicksTag, TupleTag<String> purchasesTag) {
            this.clicksTag = clicksTag;
            this.purchasesTag = purchasesTag;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, BoundedWindow window) {
            BoundedWindow w = window;
            KV e = (KV)c.element();
            CoGbkResult row = (CoGbkResult)e.getValue();
            Iterable clicks = row.getAll(this.clicksTag);
            Iterable purchases = row.getAll(this.purchasesTag);
            for (String click : clicks) {
                for (String purchase : purchases) {
                    c.output((Object)KV.of((Object)(click + ":" + purchase), (Object)(c.timestamp().getMillis() + ":" + w.maxTimestamp().getMillis())));
                }
            }
        }
    }
}

