package org.apache.beam.sdk.transforms;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.TestUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionView;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/FlattenTest.class */
public class FlattenTest implements Serializable {

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/transforms/FlattenTest$ClassWithoutCoder.class */
    private static class ClassWithoutCoder {
        private ClassWithoutCoder() {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/transforms/FlattenTest$IdentityFn.class */
    private static class IdentityFn<T> extends DoFn<T, T> {
        private IdentityFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, T>.ProcessContext processContext) {
            processContext.output(processContext.element());
        }
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenPCollectionList() {
        TestPipeline create = TestPipeline.create();
        List<List<String>> asList = Arrays.asList(TestUtils.LINES, TestUtils.NO_LINES, TestUtils.LINES2, TestUtils.NO_LINES, TestUtils.LINES, TestUtils.NO_LINES);
        PAssert.that(makePCollectionListOfStrings(create, asList).apply(Flatten.pCollections())).containsInAnyOrder(flattenLists(asList));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenPCollectionListThenParDo() {
        TestPipeline create = TestPipeline.create();
        List<List<String>> asList = Arrays.asList(TestUtils.LINES, TestUtils.NO_LINES, TestUtils.LINES2, TestUtils.NO_LINES, TestUtils.LINES, TestUtils.NO_LINES);
        PAssert.that(makePCollectionListOfStrings(create, asList).apply(Flatten.pCollections()).apply(ParDo.of(new IdentityFn()))).containsInAnyOrder(flattenLists(asList));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenPCollectionListEmpty() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(PCollectionList.empty(create).apply(Flatten.pCollections()).setCoder(StringUtf8Coder.of())).empty();
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenInputMultipleCopies() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply("mkLines", CountingInput.upTo(5));
        PCollection apply2 = PCollectionList.of(apply).and(apply).and(create.apply("mkOtherLines", CountingInput.upTo(5)).apply(MapElements.via(new SimpleFunction<Long, Long>() { // from class: org.apache.beam.sdk.transforms.FlattenTest.1
            public Long apply(Long l) {
                return Long.valueOf(l.longValue() + 10);
            }
        }))).apply(Flatten.pCollections());
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            arrayList.add(Long.valueOf(i));
            arrayList.add(Long.valueOf(i));
            arrayList.add(Long.valueOf(i + 10));
        }
        PAssert.that(apply2).containsInAnyOrder(arrayList);
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testEmptyFlattenAsSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = PCollectionList.empty(create).apply(Flatten.pCollections()).setCoder(StringUtf8Coder.of()).apply(View.asIterable());
        PAssert.that(create.apply(Create.of(new Void[]{(Void) null}).withCoder(VoidCoder.of())).apply(ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Void, String>() { // from class: org.apache.beam.sdk.transforms.FlattenTest.2
            @DoFn.ProcessElement
            public void processElement(DoFn<Void, String>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output((String) it.next());
                }
            }
        }))).empty();
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenPCollectionListEmptyThenParDo() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(PCollectionList.empty(create).apply(Flatten.pCollections()).setCoder(StringUtf8Coder.of()).apply(ParDo.of(new IdentityFn()))).empty();
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testFlattenNoListsNoCoder() {
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("cannot provide a Coder for empty");
        TestPipeline create = TestPipeline.create();
        PCollectionList.empty(create).apply(Flatten.pCollections());
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenIterables() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(new Iterable[]{TestUtils.LINES}).withCoder(IterableCoder.of(StringUtf8Coder.of()))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.LINES_ARRAY);
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenIterablesLists() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(new List[]{TestUtils.LINES}).withCoder(ListCoder.of(StringUtf8Coder.of()))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.LINES_ARRAY);
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenIterablesSets() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(new Set[]{ImmutableSet.copyOf(TestUtils.LINES)}).withCoder(SetCoder.of(StringUtf8Coder.of()))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.LINES_ARRAY);
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenIterablesCollections() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(new Collection[]{ImmutableSet.copyOf(TestUtils.LINES)}).withCoder(CollectionCoder.of(StringUtf8Coder.of()))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.LINES_ARRAY);
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testFlattenIterablesEmpty() {
        TestPipeline create = TestPipeline.create();
        PAssert.that(create.apply(Create.of(new Iterable[]{TestUtils.NO_LINES}).withCoder(IterableCoder.of(StringUtf8Coder.of()))).apply(Flatten.iterables())).containsInAnyOrder(TestUtils.NO_LINES_ARRAY);
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testEqualWindowFnPropagation() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = PCollectionList.of(create.apply("CreateInput1", Create.of(new String[]{"Input1"})).apply("Window1", Window.into(FixedWindows.of(Duration.standardMinutes(1L))))).and(create.apply("CreateInput2", Create.of(new String[]{"Input2"})).apply("Window2", Window.into(FixedWindows.of(Duration.standardMinutes(1L))))).apply(Flatten.pCollections());
        create.run();
        Assert.assertTrue(apply.getWindowingStrategy().getWindowFn().isCompatible(FixedWindows.of(Duration.standardMinutes(1L))));
    }

    @Test
    @Category({NeedsRunner.class})
    public void testCompatibleWindowFnPropagation() {
        TestPipeline create = TestPipeline.create();
        PCollection apply = PCollectionList.of(create.apply("CreateInput1", Create.of(new String[]{"Input1"})).apply("Window1", Window.into(Sessions.withGapDuration(Duration.standardMinutes(1L))))).and(create.apply("CreateInput2", Create.of(new String[]{"Input2"})).apply("Window2", Window.into(Sessions.withGapDuration(Duration.standardMinutes(2L))))).apply(Flatten.pCollections());
        create.run();
        Assert.assertTrue(apply.getWindowingStrategy().getWindowFn().isCompatible(Sessions.withGapDuration(Duration.standardMinutes(2L))));
    }

    @Test
    public void testIncompatibleWindowFnPropagationFailure() {
        TestPipeline create = TestPipeline.create();
        try {
            PCollectionList.of(create.apply("CreateInput1", Create.of(new String[]{"Input1"})).apply("Window1", Window.into(FixedWindows.of(Duration.standardMinutes(1L))))).and(create.apply("CreateInput2", Create.of(new String[]{"Input2"})).apply("Window2", Window.into(FixedWindows.of(Duration.standardMinutes(2L))))).apply(Flatten.pCollections());
            Assert.fail("Exception should have been thrown");
        } catch (IllegalStateException e) {
            Assert.assertTrue(e.getMessage().startsWith("Inputs to Flatten had incompatible window windowFns"));
        }
    }

    @Test
    public void testFlattenGetName() {
        Assert.assertEquals("Flatten.FlattenIterables", Flatten.iterables().getName());
        Assert.assertEquals("Flatten.FlattenPCollectionList", Flatten.pCollections().getName());
    }

    private PCollectionList<String> makePCollectionListOfStrings(Pipeline pipeline, List<List<String>> list) {
        return makePCollectionList(pipeline, StringUtf8Coder.of(), list);
    }

    private <T> PCollectionList<T> makePCollectionList(Pipeline pipeline, Coder<T> coder, List<List<T>> list) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        Iterator<List<T>> it = list.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            arrayList.add(pipeline.apply("Create" + i2, Create.of(it.next()).withCoder(coder)));
        }
        return PCollectionList.of(arrayList);
    }

    private <T> List<T> flattenLists(List<List<T>> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<List<T>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.addAll(it.next());
        }
        return arrayList;
    }
}
