package org.apache.beam.runners.direct;

import java.util.Collections;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Keys;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.class */
public class KeyedPValueTrackingVisitorTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private KeyedPValueTrackingVisitor visitor;
    private Pipeline p;

    /* loaded from: input_file:org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest$CompositeKeyer.class */
    private static class CompositeKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
        private CompositeKeyer() {
        }

        public PCollection<K> apply(PCollection<K> pCollection) {
            return pCollection.apply(new PrimitiveKeyer()).apply(ParDo.of(new IdentityFn()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest$IdentityFn.class */
    public static class IdentityFn<K> extends DoFn<K, K> {
        private IdentityFn() {
        }

        public void processElement(DoFn<K, K>.ProcessContext processContext) throws Exception {
            processContext.output(processContext.element());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest$PrimitiveKeyer.class */
    public static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> {
        private PrimitiveKeyer() {
        }

        public PCollection<K> apply(PCollection<K> pCollection) {
            return PCollection.createPrimitiveOutputInternal(pCollection.getPipeline(), pCollection.getWindowingStrategy(), pCollection.isBounded()).setCoder(pCollection.getCoder());
        }
    }

    @Before
    public void setup() {
        this.p = TestPipeline.create();
        this.visitor = KeyedPValueTrackingVisitor.create(ImmutableSet.of(PrimitiveKeyer.class, CompositeKeyer.class));
    }

    @Test
    public void primitiveProducesKeyedOutputUnkeyedInputKeyedOutput() {
        PCollection apply = this.p.apply(Create.of(new Integer[]{1, 2, 3})).apply(new PrimitiveKeyer());
        this.p.traverseTopologically(this.visitor);
        Assert.assertThat(this.visitor.getKeyedPValues(), Matchers.hasItem(apply));
    }

    @Test
    public void primitiveProducesKeyedOutputKeyedInputKeyedOutut() {
        PCollection apply = this.p.apply(Create.of(new Integer[]{1, 2, 3})).apply("firstKey", new PrimitiveKeyer()).apply("secondKey", new PrimitiveKeyer());
        this.p.traverseTopologically(this.visitor);
        Assert.assertThat(this.visitor.getKeyedPValues(), Matchers.hasItem(apply));
    }

    @Test
    public void compositeProducesKeyedOutputUnkeyedInputKeyedOutput() {
        PCollection apply = this.p.apply(Create.of(new Integer[]{1, 2, 3})).apply(new CompositeKeyer());
        this.p.traverseTopologically(this.visitor);
        Assert.assertThat(this.visitor.getKeyedPValues(), Matchers.hasItem(apply));
    }

    @Test
    public void compositeProducesKeyedOutputKeyedInputKeyedOutut() {
        PCollection apply = this.p.apply(Create.of(new Integer[]{1, 2, 3})).apply("firstKey", new CompositeKeyer()).apply("secondKey", new CompositeKeyer());
        this.p.traverseTopologically(this.visitor);
        Assert.assertThat(this.visitor.getKeyedPValues(), Matchers.hasItem(apply));
    }

    @Test
    public void noInputUnkeyedOutput() {
        PCollection apply = this.p.apply(Create.of(new KV[]{KV.of(-1, Collections.emptyList())}).withCoder(KvCoder.of(VarIntCoder.of(), IterableCoder.of(VoidCoder.of()))));
        this.p.traverseTopologically(this.visitor);
        Assert.assertThat(this.visitor.getKeyedPValues(), Matchers.not(Matchers.hasItem(apply)));
    }

    @Test
    public void keyedInputNotProducesKeyedOutputUnkeyedOutput() {
        PCollection apply = this.p.apply(Create.of(new Integer[]{1, 2, 3})).apply(new PrimitiveKeyer()).apply(ParDo.of(new IdentityFn()));
        this.p.traverseTopologically(this.visitor);
        Assert.assertThat(this.visitor.getKeyedPValues(), Matchers.not(Matchers.hasItem(apply)));
    }

    @Test
    public void unkeyedInputNotProducesKeyedOutputUnkeyedOutput() {
        PCollection apply = this.p.apply(Create.of(new Integer[]{1, 2, 3})).apply(ParDo.of(new IdentityFn()));
        this.p.traverseTopologically(this.visitor);
        Assert.assertThat(this.visitor.getKeyedPValues(), Matchers.not(Matchers.hasItem(apply)));
    }

    @Test
    public void traverseMultipleTimesThrows() {
        this.p.apply(Create.of(new KV[]{KV.of(1, (Void) null), KV.of(2, (Void) null), KV.of(3, (Void) null)}).withCoder(KvCoder.of(VarIntCoder.of(), VoidCoder.of()))).apply(GroupByKey.create()).apply(Keys.create());
        this.p.traverseTopologically(this.visitor);
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("already been finalized");
        this.thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName());
        this.p.traverseTopologically(this.visitor);
    }

    @Test
    public void getKeyedPValuesBeforeTraverseThrows() {
        this.thrown.expect(IllegalStateException.class);
        this.thrown.expectMessage("completely traversed");
        this.thrown.expectMessage("getKeyedPValues");
        this.visitor.getKeyedPValues();
    }
}
