package org.apache.beam.sdk.transforms;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/transforms/ViewTest.class */
public class ViewTest implements Serializable {

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/sdk/transforms/ViewTest$NonDeterministicStringCoder.class */
    private static class NonDeterministicStringCoder extends CustomCoder<String> {
        private NonDeterministicStringCoder() {
        }

        public void encode(String str, OutputStream outputStream, Coder.Context context) throws CoderException, IOException {
            StringUtf8Coder.of().encode(str, outputStream, context);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public String m128decode(InputStream inputStream, Coder.Context context) throws CoderException, IOException {
            return StringUtf8Coder.of().decode(inputStream, context);
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
            throw new Coder.NonDeterministicException(this, "Test coder is not deterministic on purpose.");
        }
    }

    @Test
    @Category({RunnableOnService.class})
    public void testSingletonSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("Create47", Create.of(new Integer[]{47})).apply(View.asSingleton());
        PAssert.that(create.apply("Create123", Create.of(new Integer[]{1, 2, 3})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.1
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                processContext.output(processContext.sideInput(apply));
            }
        }))).containsInAnyOrder(new Integer[]{47, 47, 47});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedSingletonSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("Create47", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(47, new Instant(1L)), TimestampedValue.of(48, new Instant(11L))})).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asSingleton());
        PAssert.that(create.apply("Create123", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(1, new Instant(4L)), TimestampedValue.of(2, new Instant(8L)), TimestampedValue.of(3, new Instant(12L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.2
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                processContext.output(processContext.sideInput(apply));
            }
        }))).containsInAnyOrder(new Integer[]{47, 47, 48});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testEmptySingletonSideInput() throws Exception {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateEmptyIntegers", Create.of(new Integer[0]).withCoder(VarIntCoder.of())).apply(View.asSingleton());
        create.apply("Create123", Create.of(new Integer[]{1, 2, 3})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.3
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                processContext.output(processContext.sideInput(apply));
            }
        }));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectCause(Matchers.isA(NoSuchElementException.class));
        this.thrown.expectMessage("Empty");
        this.thrown.expectMessage("PCollection");
        this.thrown.expectMessage("singleton");
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testNonSingletonSideInput() throws Exception {
        TestPipeline create = TestPipeline.create();
        PCollection apply = create.apply(Create.of(new Integer[]{1, 2, 3}));
        final PCollectionView apply2 = apply.apply(View.asSingleton());
        apply.apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply2}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.4
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                processContext.output(processContext.sideInput(apply2));
            }
        }));
        this.thrown.expect(Pipeline.PipelineExecutionException.class);
        this.thrown.expectCause(Matchers.isA(IllegalArgumentException.class));
        this.thrown.expectMessage("PCollection");
        this.thrown.expectMessage("more than one");
        this.thrown.expectMessage("singleton");
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testListSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new Integer[]{11, 13, 17, 23})).apply(View.asList());
        PAssert.that(create.apply("CreateMainInput", Create.of(new Integer[]{29, 31})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.5
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Preconditions.checkArgument(((List) processContext.sideInput(apply)).size() == 4);
                Preconditions.checkArgument(((List) processContext.sideInput(apply)).get(0) == ((List) processContext.sideInput(apply)).get(0));
                Iterator it = ((List) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output((Integer) it.next());
                }
            }
        }))).containsInAnyOrder(new Integer[]{11, 13, 17, 23, 11, 13, 17, 23});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedListSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(11, new Instant(1L)), TimestampedValue.of(13, new Instant(1L)), TimestampedValue.of(17, new Instant(1L)), TimestampedValue.of(23, new Instant(1L)), TimestampedValue.of(31, new Instant(11L)), TimestampedValue.of(33, new Instant(11L)), TimestampedValue.of(37, new Instant(11L)), TimestampedValue.of(43, new Instant(11L))})).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asList());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(29, new Instant(1L)), TimestampedValue.of(35, new Instant(11L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.6
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Preconditions.checkArgument(((List) processContext.sideInput(apply)).size() == 4);
                Preconditions.checkArgument(((List) processContext.sideInput(apply)).get(0) == ((List) processContext.sideInput(apply)).get(0));
                Iterator it = ((List) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output((Integer) it.next());
                }
            }
        }))).containsInAnyOrder(new Integer[]{11, 13, 17, 23, 31, 33, 37, 43});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testEmptyListSideInput() throws Exception {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateEmptyView", Create.of(new Integer[0]).withCoder(VarIntCoder.of())).apply(View.asList());
        PAssert.that(create.apply("Create1", Create.of(new Integer[]{1})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.7
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Assert.assertTrue(((List) processContext.sideInput(apply)).isEmpty());
                Assert.assertFalse(((List) processContext.sideInput(apply)).iterator().hasNext());
                processContext.output(1);
            }
        }))).containsInAnyOrder(new Integer[]{1});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testListSideInputIsImmutable() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new Integer[]{11})).apply(View.asList());
        PAssert.that(create.apply("CreateMainInput", Create.of(new Integer[]{29})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.8
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                try {
                    ((List) processContext.sideInput(apply)).clear();
                    Assert.fail("Expected UnsupportedOperationException on clear()");
                } catch (UnsupportedOperationException e) {
                }
                try {
                    ((List) processContext.sideInput(apply)).add(4);
                    Assert.fail("Expected UnsupportedOperationException on add()");
                } catch (UnsupportedOperationException e2) {
                }
                try {
                    ((List) processContext.sideInput(apply)).addAll(new ArrayList());
                    Assert.fail("Expected UnsupportedOperationException on addAll()");
                } catch (UnsupportedOperationException e3) {
                }
                try {
                    ((List) processContext.sideInput(apply)).remove(0);
                    Assert.fail("Expected UnsupportedOperationException on remove()");
                } catch (UnsupportedOperationException e4) {
                }
                Iterator it = ((List) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output((Integer) it.next());
                }
            }
        }))).containsInAnyOrder(new Integer[]{11});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testIterableSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new Integer[]{11, 13, 17, 23})).apply(View.asIterable());
        PAssert.that(create.apply("CreateMainInput", Create.of(new Integer[]{29, 31})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.9
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output((Integer) it.next());
                }
            }
        }))).containsInAnyOrder(new Integer[]{11, 13, 17, 23, 11, 13, 17, 23});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedIterableSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(11, new Instant(1L)), TimestampedValue.of(13, new Instant(1L)), TimestampedValue.of(17, new Instant(1L)), TimestampedValue.of(23, new Instant(1L)), TimestampedValue.of(31, new Instant(11L)), TimestampedValue.of(33, new Instant(11L)), TimestampedValue.of(37, new Instant(11L)), TimestampedValue.of(43, new Instant(11L))})).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asIterable());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(29, new Instant(1L)), TimestampedValue.of(35, new Instant(11L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.10
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    processContext.output((Integer) it.next());
                }
            }
        }))).containsInAnyOrder(new Integer[]{11, 13, 17, 23, 31, 33, 37, 43});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testEmptyIterableSideInput() throws Exception {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateEmptyView", Create.of(new Integer[0]).withCoder(VarIntCoder.of())).apply(View.asIterable());
        PAssert.that(create.apply("Create1", Create.of(new Integer[]{1})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.11
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Assert.assertFalse(((Iterable) processContext.sideInput(apply)).iterator().hasNext());
                processContext.output(1);
            }
        }))).containsInAnyOrder(new Integer[]{1});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testIterableSideInputIsImmutable() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new Integer[]{11})).apply(View.asIterable());
        PAssert.that(create.apply("CreateMainInput", Create.of(new Integer[]{29})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.12
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(apply)).iterator();
                while (it.hasNext()) {
                    try {
                        it.remove();
                        Assert.fail("Expected UnsupportedOperationException on remove()");
                    } catch (UnsupportedOperationException e) {
                    }
                    processContext.output(it.next());
                }
            }
        }))).containsInAnyOrder(new Integer[]{11});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMultimapSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)})).apply(View.asMultimap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{"apple", "banana", "blackberry"})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.13
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                Iterator it = ((Iterable) ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of(processContext.element(), (Integer) it.next()));
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMultimapAsEntrySetSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)})).apply(View.asMultimap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new Integer[]{2})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.14
            public void processElement(DoFn<Integer, KV<String, Integer>>.ProcessContext processContext) {
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).size());
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).entrySet().size());
                for (Map.Entry entry : ((Map) processContext.sideInput(apply)).entrySet()) {
                    Iterator it = ((Iterable) entry.getValue()).iterator();
                    while (it.hasNext()) {
                        processContext.output(KV.of(entry.getKey(), (Integer) it.next()));
                    }
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMultimapSideInputWithNonDeterministicKeyCoder() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)}).withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))).apply(View.asMultimap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{"apple", "banana", "blackberry"})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.15
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                Iterator it = ((Iterable) ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of(processContext.element(), (Integer) it.next()));
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedMultimapSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(KV.of("a", 1), new Instant(1L)), TimestampedValue.of(KV.of("a", 2), new Instant(7L)), TimestampedValue.of(KV.of("b", 3), new Instant(14L))})).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asMultimap());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of("apple", new Instant(5L)), TimestampedValue.of("banana", new Instant(13L)), TimestampedValue.of("blackberry", new Instant(16L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.16
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                Iterator it = ((Iterable) ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of(processContext.element(), (Integer) it.next()));
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedMultimapAsEntrySetSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(KV.of("a", 1), new Instant(1L)), TimestampedValue.of(KV.of("a", 2), new Instant(7L)), TimestampedValue.of(KV.of("b", 3), new Instant(14L))})).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asMultimap());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(1, new Instant(5L)), TimestampedValue.of(1, new Instant(16L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.17
            public void processElement(DoFn<Integer, KV<String, Integer>>.ProcessContext processContext) {
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).size());
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).entrySet().size());
                for (Map.Entry entry : ((Map) processContext.sideInput(apply)).entrySet()) {
                    Iterator it = ((Iterable) entry.getValue()).iterator();
                    while (it.hasNext()) {
                        processContext.output(KV.of(entry.getKey(), (Integer) it.next()));
                    }
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(KV.of("a", 1), new Instant(1L)), TimestampedValue.of(KV.of("a", 2), new Instant(7L)), TimestampedValue.of(KV.of("b", 3), new Instant(14L))}).withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asMultimap());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of("apple", new Instant(5L)), TimestampedValue.of("banana", new Instant(13L)), TimestampedValue.of("blackberry", new Instant(16L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.18
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                Iterator it = ((Iterable) ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of(processContext.element(), (Integer) it.next()));
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testEmptyMultimapSideInput() throws Exception {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateEmptyView", Create.of(new KV[0]).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(View.asMultimap());
        PAssert.that(create.apply("Create1", Create.of(new Integer[]{1})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.19
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Assert.assertTrue(((Map) processContext.sideInput(apply)).isEmpty());
                Assert.assertTrue(((Map) processContext.sideInput(apply)).entrySet().isEmpty());
                Assert.assertFalse(((Map) processContext.sideInput(apply)).entrySet().iterator().hasNext());
                processContext.output(processContext.element());
            }
        }))).containsInAnyOrder(new Integer[]{1});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws Exception {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateEmptyView", Create.of(new KV[0]).withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))).apply(View.asMultimap());
        PAssert.that(create.apply("Create1", Create.of(new Integer[]{1})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.20
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Assert.assertTrue(((Map) processContext.sideInput(apply)).isEmpty());
                Assert.assertTrue(((Map) processContext.sideInput(apply)).entrySet().isEmpty());
                Assert.assertFalse(((Map) processContext.sideInput(apply)).entrySet().iterator().hasNext());
                processContext.output(processContext.element());
            }
        }))).containsInAnyOrder(new Integer[]{1});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMultimapSideInputIsImmutable() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1)})).apply(View.asMultimap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{"apple"})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.21
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                try {
                    ((Map) processContext.sideInput(apply)).clear();
                    Assert.fail("Expected UnsupportedOperationException on clear()");
                } catch (UnsupportedOperationException e) {
                }
                try {
                    ((Map) processContext.sideInput(apply)).put("c", ImmutableList.of(3));
                    Assert.fail("Expected UnsupportedOperationException on put()");
                } catch (UnsupportedOperationException e2) {
                }
                try {
                    ((Map) processContext.sideInput(apply)).remove("c");
                    Assert.fail("Expected UnsupportedOperationException on remove()");
                } catch (UnsupportedOperationException e3) {
                }
                try {
                    ((Map) processContext.sideInput(apply)).putAll(new HashMap());
                    Assert.fail("Expected UnsupportedOperationException on putAll()");
                } catch (UnsupportedOperationException e4) {
                }
                Iterator it = ((Iterable) ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of(processContext.element(), (Integer) it.next()));
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMapSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1), KV.of("b", 3)})).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{"apple", "banana", "blackberry"})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.22
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of(processContext.element(), ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))));
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMapAsEntrySetSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1), KV.of("b", 3)})).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new Integer[]{2})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.23
            public void processElement(DoFn<Integer, KV<String, Integer>>.ProcessContext processContext) {
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).size());
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).entrySet().size());
                for (Map.Entry entry : ((Map) processContext.sideInput(apply)).entrySet()) {
                    processContext.output(KV.of(entry.getKey(), entry.getValue()));
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("a", 1), KV.of("b", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMapSideInputWithNonDeterministicKeyCoder() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1), KV.of("b", 3)}).withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{"apple", "banana", "blackberry"})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.24
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of(processContext.element(), ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))));
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedMapSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(KV.of("a", 1), new Instant(1L)), TimestampedValue.of(KV.of("b", 2), new Instant(4L)), TimestampedValue.of(KV.of("b", 3), new Instant(18L))})).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of("apple", new Instant(5L)), TimestampedValue.of("banana", new Instant(4L)), TimestampedValue.of("blackberry", new Instant(16L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.25
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of(processContext.element(), ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))));
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedMapAsEntrySetSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(KV.of("a", 1), new Instant(1L)), TimestampedValue.of(KV.of("b", 2), new Instant(4L)), TimestampedValue.of(KV.of("b", 3), new Instant(18L))})).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(2, new Instant(5L)), TimestampedValue.of(1, new Instant(16L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.26
            public void processElement(DoFn<Integer, KV<String, Integer>>.ProcessContext processContext) {
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).size());
                Assert.assertEquals(((Integer) processContext.element()).intValue(), ((Map) processContext.sideInput(apply)).entrySet().size());
                for (Map.Entry entry : ((Map) processContext.sideInput(apply)).entrySet()) {
                    processContext.output(KV.of(entry.getKey(), entry.getValue()));
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("a", 1), KV.of("b", 2), KV.of("b", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedMapSideInputWithNonDeterministicKeyCoder() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(KV.of("a", 1), new Instant(1L)), TimestampedValue.of(KV.of("b", 2), new Instant(4L)), TimestampedValue.of(KV.of("b", 3), new Instant(18L))}).withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))).apply("SideWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of("apple", new Instant(5L)), TimestampedValue.of("banana", new Instant(4L)), TimestampedValue.of("blackberry", new Instant(16L))})).apply("MainWindowInto", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.27
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of(processContext.element(), ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))));
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testEmptyMapSideInput() throws Exception {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateEmptyView", Create.of(new KV[0]).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))).apply(View.asMap());
        PAssert.that(create.apply("Create1", Create.of(new Integer[]{1})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.28
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Assert.assertTrue(((Map) processContext.sideInput(apply)).isEmpty());
                Assert.assertTrue(((Map) processContext.sideInput(apply)).entrySet().isEmpty());
                Assert.assertFalse(((Map) processContext.sideInput(apply)).entrySet().iterator().hasNext());
                processContext.output(processContext.element());
            }
        }))).containsInAnyOrder(new Integer[]{1});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws Exception {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateEmptyView", Create.of(new KV[0]).withCoder(KvCoder.of(new NonDeterministicStringCoder(), VarIntCoder.of()))).apply(View.asMap());
        PAssert.that(create.apply("Create1", Create.of(new Integer[]{1})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.29
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                Assert.assertTrue(((Map) processContext.sideInput(apply)).isEmpty());
                Assert.assertTrue(((Map) processContext.sideInput(apply)).entrySet().isEmpty());
                Assert.assertFalse(((Map) processContext.sideInput(apply)).entrySet().iterator().hasNext());
                processContext.output(processContext.element());
            }
        }))).containsInAnyOrder(new Integer[]{1});
        create.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testMapSideInputWithNullValuesCatchesDuplicates() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", (Integer) null), KV.of("a", (Integer) null)}).withCoder(KvCoder.of(StringUtf8Coder.of(), NullableCoder.of(VarIntCoder.of())))).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{"apple", "banana", "blackberry"})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.30
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of(processContext.element(), ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))));
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)});
        this.thrown.expectCause(ThrowableMessageMatcher.hasMessage(Matchers.containsString("Duplicate values for a")));
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testMapSideInputIsImmutable() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1)})).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{"apple"})).apply("OutputSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.31
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                try {
                    ((Map) processContext.sideInput(apply)).clear();
                    Assert.fail("Expected UnsupportedOperationException on clear()");
                } catch (UnsupportedOperationException e) {
                }
                try {
                    ((Map) processContext.sideInput(apply)).put("c", 3);
                    Assert.fail("Expected UnsupportedOperationException on put()");
                } catch (UnsupportedOperationException e2) {
                }
                try {
                    ((Map) processContext.sideInput(apply)).remove("c");
                    Assert.fail("Expected UnsupportedOperationException on remove()");
                } catch (UnsupportedOperationException e3) {
                }
                try {
                    ((Map) processContext.sideInput(apply)).putAll(new HashMap());
                    Assert.fail("Expected UnsupportedOperationException on putAll()");
                } catch (UnsupportedOperationException e4) {
                }
                processContext.output(KV.of(processContext.element(), ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))));
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 1)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testCombinedMapSideInput() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new KV[]{KV.of("a", 1), KV.of("a", 20), KV.of("b", 3)})).apply("SumIntegers", Combine.perKey(new Sum.SumIntegerFn().asKeyedFn())).apply(View.asMap());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{"apple", "banana", "blackberry"})).apply("Output", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.32
            public void processElement(DoFn<String, KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of(processContext.element(), ((Map) processContext.sideInput(apply)).get(((String) processContext.element()).substring(0, 1))));
            }
        }))).containsInAnyOrder(new KV[]{KV.of("apple", 21), KV.of("banana", 3), KV.of("blackberry", 3)});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedSideInputFixedToFixed() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(11L)), TimestampedValue.of(3, new Instant(13L))})).apply("WindowSideInput", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(Sum.integersGlobally().withoutDefaults()).apply(View.asSingleton());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of("A", new Instant(4L)), TimestampedValue.of("B", new Instant(15L)), TimestampedValue.of("C", new Instant(7L))})).apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputMainAndSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.ViewTest.33
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output(((String) processContext.element()) + processContext.sideInput(apply));
            }
        }))).containsInAnyOrder(new String[]{"A1", "B5", "C1"});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedSideInputFixedToGlobal() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(1, new Instant(1L)), TimestampedValue.of(2, new Instant(11L)), TimestampedValue.of(3, new Instant(13L))})).apply("WindowSideInput", Window.into(new GlobalWindows())).apply(Sum.integersGlobally()).apply(View.asSingleton());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of("A", new Instant(4L)), TimestampedValue.of("B", new Instant(15L)), TimestampedValue.of("C", new Instant(7L))})).apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputMainAndSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.ViewTest.34
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output(((String) processContext.element()) + processContext.sideInput(apply));
            }
        }))).containsInAnyOrder(new String[]{"A6", "B6", "C6"});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testWindowedSideInputFixedToFixedWithDefault() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of(2, new Instant(11L)), TimestampedValue.of(3, new Instant(13L))})).apply("WindowSideInput", Window.into(FixedWindows.of(Duration.millis(10L)))).apply(Sum.integersGlobally().asSingletonView());
        PAssert.that(create.apply("CreateMainInput", Create.timestamped(new TimestampedValue[]{TimestampedValue.of("A", new Instant(4L)), TimestampedValue.of("B", new Instant(15L)), TimestampedValue.of("C", new Instant(7L))})).apply("WindowMainInput", Window.into(FixedWindows.of(Duration.millis(10L)))).apply("OutputMainAndSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.ViewTest.35
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output(((String) processContext.element()) + processContext.sideInput(apply));
            }
        }))).containsInAnyOrder(new String[]{"A0", "B5", "C0"});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testSideInputWithNullDefault() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateSideInput", Create.of(new Void[]{(Void) null}).withCoder(VoidCoder.of())).apply(Combine.globally(new SerializableFunction<Iterable<Void>, Void>() { // from class: org.apache.beam.sdk.transforms.ViewTest.36
            public Void apply(Iterable<Void> iterable) {
                return null;
            }
        }).asSingletonView());
        PAssert.that(create.apply("CreateMainInput", Create.of(new String[]{""})).apply("OutputMainAndSideInputs", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<String, String>() { // from class: org.apache.beam.sdk.transforms.ViewTest.37
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                processContext.output(((String) processContext.element()) + processContext.sideInput(apply));
            }
        }))).containsInAnyOrder(new String[]{"null"});
        create.run();
    }

    @Test
    @Category({RunnableOnService.class})
    public void testSideInputWithNestedIterables() {
        TestPipeline create = TestPipeline.create();
        final PCollectionView apply = create.apply("CreateVoid1", Create.of(new Void[]{(Void) null}).withCoder(VoidCoder.of())).apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.38
            public void processElement(DoFn<Void, Integer>.ProcessContext processContext) {
                processContext.output(17);
            }
        })).apply("View1", View.asIterable());
        final PCollectionView apply2 = create.apply("CreateVoid2", Create.of(new Void[]{(Void) null}).withCoder(VoidCoder.of())).apply("OutputSideInput", ParDo.withSideInputs(new PCollectionView[]{apply}).of(new DoFn<Void, Iterable<Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.39
            public void processElement(DoFn<Void, Iterable<Integer>>.ProcessContext processContext) {
                processContext.output(processContext.sideInput(apply));
            }
        })).apply("View2", View.asIterable());
        PAssert.that(create.apply("CreateVoid3", Create.of(new Void[]{(Void) null}).withCoder(VoidCoder.of())).apply("ReadIterableSideInput", ParDo.withSideInputs(new PCollectionView[]{apply2}).of(new DoFn<Void, Integer>() { // from class: org.apache.beam.sdk.transforms.ViewTest.40
            public void processElement(DoFn<Void, Integer>.ProcessContext processContext) {
                Iterator it = ((Iterable) processContext.sideInput(apply2)).iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((Iterable) it.next()).iterator();
                    while (it2.hasNext()) {
                        processContext.output((Integer) it2.next());
                    }
                }
            }
        }))).containsInAnyOrder(new Integer[]{17});
        create.run();
    }

    @Test
    public void testViewGetName() {
        Assert.assertEquals("View.AsSingleton", View.asSingleton().getName());
        Assert.assertEquals("View.AsIterable", View.asIterable().getName());
        Assert.assertEquals("View.AsMap", View.asMap().getName());
        Assert.assertEquals("View.AsMultimap", View.asMultimap().getName());
    }

    private void testViewUnbounded(Pipeline pipeline, PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> pTransform) {
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("Unable to create a side-input view from input");
        this.thrown.expectCause(ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded PCollection")));
        pipeline.apply(new PTransform<PBegin, PCollection<KV<String, Integer>>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.41
            public PCollection<KV<String, Integer>> apply(PBegin pBegin) {
                return PCollection.createPrimitiveOutputInternal(pBegin.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED).setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() { // from class: org.apache.beam.sdk.transforms.ViewTest.41.1
                });
            }
        }).apply(pTransform);
    }

    private void testViewNonmerging(Pipeline pipeline, PTransform<PCollection<KV<String, Integer>>, ? extends PCollectionView<?>> pTransform) {
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("Unable to create a side-input view from input");
        this.thrown.expectCause(ThrowableMessageMatcher.hasMessage(Matchers.containsString("Consumed by GroupByKey")));
        pipeline.apply(Create.of(new KV[]{KV.of("hello", 5)})).apply(Window.into(new InvalidWindows("Consumed by GroupByKey", FixedWindows.of(Duration.standardHours(1L))))).apply(pTransform);
    }

    @Test
    public void testViewUnboundedAsSingletonDirect() {
        testViewUnbounded(TestPipeline.create(), View.asSingleton());
    }

    @Test
    public void testViewUnboundedAsIterableDirect() {
        testViewUnbounded(TestPipeline.create(), View.asIterable());
    }

    @Test
    public void testViewUnboundedAsListDirect() {
        testViewUnbounded(TestPipeline.create(), View.asList());
    }

    @Test
    public void testViewUnboundedAsMapDirect() {
        testViewUnbounded(TestPipeline.create(), View.asMap());
    }

    @Test
    public void testViewUnboundedAsMultimapDirect() {
        testViewUnbounded(TestPipeline.create(), View.asMultimap());
    }

    @Test
    public void testViewNonmergingAsSingletonDirect() {
        testViewNonmerging(TestPipeline.create(), View.asSingleton());
    }

    @Test
    public void testViewNonmergingAsIterableDirect() {
        testViewNonmerging(TestPipeline.create(), View.asIterable());
    }

    @Test
    public void testViewNonmergingAsListDirect() {
        testViewNonmerging(TestPipeline.create(), View.asList());
    }

    @Test
    public void testViewNonmergingAsMapDirect() {
        testViewNonmerging(TestPipeline.create(), View.asMap());
    }

    @Test
    public void testViewNonmergingAsMultimapDirect() {
        testViewNonmerging(TestPipeline.create(), View.asMultimap());
    }
}
