package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/FlattenTest.class */
public class FlattenTest implements Serializable {

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/transforms/FlattenTest$ClassWithoutCoder.class */
    private static class ClassWithoutCoder {
        private ClassWithoutCoder() {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/FlattenTest$IdentityFn.class */
    private static class IdentityFn<T> extends DoFn<T, T> {
        private IdentityFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, T>.ProcessContext processContext) {
            processContext.output(processContext.element());
        }
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenPCollections() {
        List<List<String>> asList = Arrays.asList(TestUtils.LINES, TestUtils.NO_LINES, TestUtils.LINES2, TestUtils.NO_LINES, TestUtils.LINES, TestUtils.NO_LINES);
        PAssert.that((PCollection) makePCollectionListOfStrings(this.p, asList).apply(Flatten.pCollections())).containsInAnyOrder(flattenLists(asList));
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenPCollectionsSingletonList() {
        PCollection pCollection = (PCollection) this.p.apply(Create.of(TestUtils.LINES));
        PCollection pCollection2 = (PCollection) PCollectionList.of(pCollection).apply(Flatten.pCollections());
        Assert.assertThat(pCollection2, Matchers.not(Matchers.equalTo(pCollection)));
        PAssert.that(pCollection2).containsInAnyOrder(TestUtils.LINES);
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenPCollectionsThenParDo() {
        List<List<String>> asList = Arrays.asList(TestUtils.LINES, TestUtils.NO_LINES, TestUtils.LINES2, TestUtils.NO_LINES, TestUtils.LINES, TestUtils.NO_LINES);
        PAssert.that((PCollection) ((PCollection) makePCollectionListOfStrings(this.p, asList).apply(Flatten.pCollections())).apply(ParDo.of(new IdentityFn()))).containsInAnyOrder(flattenLists(asList));
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenPCollectionsEmpty() {
        PAssert.that(((PCollection) PCollectionList.empty(this.p).apply(Flatten.pCollections())).setCoder(StringUtf8Coder.of())).empty();
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenInputMultipleCopies() {
        PCollection pCollection = (PCollection) this.p.apply("mkLines", GenerateSequence.from(0L).to(5));
        PCollection pCollection2 = (PCollection) PCollectionList.of(pCollection).and(pCollection).and((PCollection) ((PCollection) this.p.apply("mkOtherLines", GenerateSequence.from(0L).to(5))).apply(MapElements.via((SimpleFunction) new SimpleFunction<Long, Long>() { // from class: org.apache.beam.sdk.transforms.FlattenTest.1
            @Override // org.apache.beam.sdk.transforms.SimpleFunction, org.apache.beam.sdk.transforms.SerializableFunction
            public Long apply(Long l) {
                return Long.valueOf(l.longValue() + 10);
            }
        }))).apply(Flatten.pCollections());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            arrayList.add(Long.valueOf(i));
            arrayList.add(Long.valueOf(i));
            arrayList.add(Long.valueOf(i + 10));
        }
        PAssert.that(pCollection2).containsInAnyOrder(arrayList);
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class, FlattenWithHeterogeneousCoders.class})
    public void testFlattenMultipleCoders() throws CannotProvideCoderException {
        PAssert.that(((PCollection) PCollectionList.of((PCollection) this.p.apply("BigEndianLongs", Create.of(0L, 1L, 2L, 3L, null, 4L, 5L, null, 6L, 7L, 8L, null, 9L).withCoder(NullableCoder.of(BigEndianLongCoder.of())))).and(((PCollection) this.p.apply("VarLengthLongs", GenerateSequence.from(0L).to(5L))).setCoder(VarLongCoder.of())).apply(Flatten.pCollections())).setCoder(NullableCoder.of(VarLongCoder.of()))).containsInAnyOrder(0L, 0L, 1L, 1L, 2L, 3L, 2L, 4L, 5L, 3L, 6L, 7L, 4L, 8L, 9L, null, null, null);
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testEmptyFlattenAsSideInput() {
        final PCollectionView<?> pCollectionView = (PCollectionView) ((PCollection) PCollectionList.empty(this.p).apply(Flatten.pCollections())).setCoder(StringUtf8Coder.of()).apply(View.asIterable());
        PAssert.that((PCollection) ((PCollection) this.p.apply(Create.of((Void) null, new Void[0]).withCoder(VoidCoder.of()))).apply(ParDo.of(new DoFn<Void, String>() { // from class: org.apache.beam.sdk.transforms.FlattenTest.2
            @DoFn.ProcessElement
            public void processElement(DoFn<Void, String>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(pCollectionView)).iterator();
                while (it.hasNext()) {
                    processContext.output((String) it.next());
                }
            }
        }).withSideInputs(pCollectionView))).empty();
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenPCollectionsEmptyThenParDo() {
        PAssert.that((PCollection) ((PCollection) PCollectionList.empty(this.p).apply(Flatten.pCollections())).setCoder(StringUtf8Coder.of()).apply(ParDo.of(new IdentityFn()))).empty();
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testFlattenNoListsNoCoder() {
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("Unable to return a default Coder");
        PCollectionList.empty(this.p).apply(Flatten.pCollections());
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenIterables() {
        PAssert.that((PCollection) ((PCollection) this.p.apply(Create.of(TestUtils.LINES, new Iterable[0]).withCoder(IterableCoder.of(StringUtf8Coder.of())))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.LINES_ARRAY);
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenIterablesLists() {
        PAssert.that((PCollection) ((PCollection) this.p.apply(Create.of(TestUtils.LINES, new List[0]).withCoder(ListCoder.of(StringUtf8Coder.of())))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.LINES_ARRAY);
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenIterablesSets() {
        PAssert.that((PCollection) ((PCollection) this.p.apply(Create.of(ImmutableSet.copyOf((Collection) TestUtils.LINES), new Set[0]).withCoder(SetCoder.of(StringUtf8Coder.of())))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.LINES_ARRAY);
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenIterablesCollections() {
        PAssert.that((PCollection) ((PCollection) this.p.apply(Create.of(ImmutableSet.copyOf((Collection) TestUtils.LINES), new Collection[0]).withCoder(CollectionCoder.of(StringUtf8Coder.of())))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.LINES_ARRAY);
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenIterablesEmpty() {
        PAssert.that((PCollection) ((PCollection) this.p.apply(Create.of(TestUtils.NO_LINES, new Iterable[0]).withCoder(IterableCoder.of(StringUtf8Coder.of())))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.NO_LINES_ARRAY);
        this.p.run();
    }

    @Test
    @Category({ValidatesRunner.class})
    public void testFlattenMultiplePCollectionsHavingMultipleConsumers() {
        PCollection pCollection = (PCollection) this.p.apply(Create.of("AA", "BBB", "CC"));
        TupleTag<String> tupleTag = new TupleTag<String>() { // from class: org.apache.beam.sdk.transforms.FlattenTest.3
        };
        final TupleTag<String> tupleTag2 = new TupleTag<String>() { // from class: org.apache.beam.sdk.transforms.FlattenTest.4
        };
        PCollectionTuple pCollectionTuple = (PCollectionTuple) pCollection.apply(ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.FlattenTest.5
            @DoFn.ProcessElement
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                if (processContext.element().length() % 2 == 0) {
                    processContext.output(processContext.element());
                } else {
                    processContext.output(tupleTag2, processContext.element());
                }
            }
        }).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        PCollection pCollection2 = pCollectionTuple.get(tupleTag);
        PCollection pCollection3 = pCollectionTuple.get(tupleTag2);
        PAssert.that((PCollection) PCollectionList.of(pCollection2).and(pCollection3).apply(Flatten.pCollections())).containsInAnyOrder("AA", "BBB", "CC");
        PAssert.that(pCollection2).containsInAnyOrder("AA", "CC");
        PAssert.that(pCollection3).containsInAnyOrder("BBB");
        this.p.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testEqualWindowFnPropagation() {
        PCollection pCollection = (PCollection) ((PCollection) this.p.apply("CreateInput1", Create.of("Input1", new String[0]))).apply("Window1", Window.into(FixedWindows.of(Duration.standardMinutes(1L))));
        PCollection pCollection2 = (PCollection) PCollectionList.of(pCollection).and((PCollection) ((PCollection) this.p.apply("CreateInput2", Create.of("Input2", new String[0]))).apply("Window2", Window.into(FixedWindows.of(Duration.standardMinutes(1L))))).apply(Flatten.pCollections());
        this.p.run();
        Assert.assertTrue(pCollection2.getWindowingStrategy().getWindowFn().isCompatible(FixedWindows.of(Duration.standardMinutes(1L))));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testCompatibleWindowFnPropagation() {
        PCollection pCollection = (PCollection) ((PCollection) this.p.apply("CreateInput1", Create.of("Input1", new String[0]))).apply("Window1", Window.into(Sessions.withGapDuration(Duration.standardMinutes(1L))));
        PCollection pCollection2 = (PCollection) PCollectionList.of(pCollection).and((PCollection) ((PCollection) this.p.apply("CreateInput2", Create.of("Input2", new String[0]))).apply("Window2", Window.into(Sessions.withGapDuration(Duration.standardMinutes(2L))))).apply(Flatten.pCollections());
        this.p.run();
        Assert.assertTrue(pCollection2.getWindowingStrategy().getWindowFn().isCompatible(Sessions.withGapDuration(Duration.standardMinutes(2L))));
    }

    @Test
    public void testIncompatibleWindowFnPropagationFailure() {
        this.p.enableAbandonedNodeEnforcement(false);
        PCollection pCollection = (PCollection) ((PCollection) this.p.apply("CreateInput1", Create.of("Input1", new String[0]))).apply("Window1", Window.into(FixedWindows.of(Duration.standardMinutes(1L))));
        try {
            PCollectionList.of(pCollection).and((PCollection) ((PCollection) this.p.apply("CreateInput2", Create.of("Input2", new String[0]))).apply("Window2", Window.into(FixedWindows.of(Duration.standardMinutes(2L))))).apply(Flatten.pCollections());
            Assert.fail("Exception should have been thrown");
        } catch (IllegalStateException e) {
            Assert.assertTrue(e.getMessage().startsWith("Inputs to Flatten had incompatible window windowFns"));
        }
    }

    @Test
    public void testFlattenGetName() {
        Assert.assertEquals("Flatten.Iterables", Flatten.iterables().getName());
        Assert.assertEquals("Flatten.PCollections", Flatten.pCollections().getName());
    }

    private PCollectionList<String> makePCollectionListOfStrings(Pipeline pipeline, List<List<String>> list) {
        return makePCollectionList(pipeline, StringUtf8Coder.of(), list);
    }

    private <T> PCollectionList<T> makePCollectionList(Pipeline pipeline, Coder<T> coder, List<List<T>> list) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        Iterator<List<T>> it = list.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            arrayList.add((PCollection) pipeline.apply("Create" + i2, Create.of(it.next()).withCoder(coder)));
        }
        return PCollectionList.of(arrayList);
    }

    private <T> List<T> flattenLists(List<List<T>> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<List<T>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next());
        }
        return arrayList;
    }
}
