package org.apache.beam.runners.direct;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.class */
public class ImmutabilityCheckingBundleFactoryTest {

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private ImmutabilityCheckingBundleFactory factory;
    private PCollection<byte[]> created;
    private PCollection<byte[]> transformed;

    /* loaded from: input_file:org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest$IdentityDoFn.class */
    private static class IdentityDoFn<T> extends DoFn<T, T> {
        private IdentityDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, T>.ProcessContext processContext) throws Exception {
            processContext.output(processContext.element());
        }
    }

    /* JADX WARN: Type inference failed for: r2v1, types: [java.lang.Object[], byte[]] */
    @Before
    public void setup() {
        TestPipeline create = TestPipeline.create();
        this.created = create.apply(Create.of((Object[]) new byte[0]).withCoder(ByteArrayCoder.of()));
        this.transformed = this.created.apply(ParDo.of(new IdentityDoFn()));
        DirectGraphVisitor directGraphVisitor = new DirectGraphVisitor();
        create.traverseTopologically(directGraphVisitor);
        this.factory = ImmutabilityCheckingBundleFactory.create(ImmutableListBundleFactory.create(), directGraphVisitor.getGraph());
    }

    @Test
    public void rootBundleSucceeds() {
        DirectRunner.UncommittedBundle createRootBundle = this.factory.createRootBundle();
        byte[] bArr = {0, 1, 2};
        createRootBundle.add(WindowedValue.valueInGlobalWindow(bArr));
        Assert.assertThat(createRootBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{WindowedValue.valueInGlobalWindow(bArr)}));
    }

    @Test
    public void noMutationKeyedBundleSucceeds() {
        DirectRunner.UncommittedBundle createKeyedBundle = this.factory.createKeyedBundle(StructuralKey.of("mykey", StringUtf8Coder.of()), this.transformed);
        WindowedValue of = WindowedValue.of(new byte[]{4, 8, 12}, new Instant(891L), new IntervalWindow(new Instant(0L), new Instant(1000L)), PaneInfo.ON_TIME_AND_ONLY_FIRING);
        createKeyedBundle.add(of);
        Assert.assertThat(createKeyedBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{of}));
    }

    @Test
    public void noMutationCreateBundleSucceeds() {
        DirectRunner.UncommittedBundle createBundle = this.factory.createBundle(this.transformed);
        WindowedValue of = WindowedValue.of(new byte[]{4, 8, 12}, new Instant(891L), new IntervalWindow(new Instant(0L), new Instant(1000L)), PaneInfo.ON_TIME_AND_ONLY_FIRING);
        createBundle.add(of);
        Assert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{of}));
    }

    @Test
    public void mutationBeforeAddKeyedBundleSucceeds() {
        DirectRunner.UncommittedBundle createKeyedBundle = this.factory.createKeyedBundle(StructuralKey.of("mykey", StringUtf8Coder.of()), this.transformed);
        byte[] bArr = {4, 8, 12};
        bArr[0] = Byte.MAX_VALUE;
        WindowedValue of = WindowedValue.of(bArr, new Instant(891L), new IntervalWindow(new Instant(0L), new Instant(1000L)), PaneInfo.ON_TIME_AND_ONLY_FIRING);
        createKeyedBundle.add(of);
        Assert.assertThat(createKeyedBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{of}));
    }

    @Test
    public void mutationBeforeAddCreateBundleSucceeds() {
        DirectRunner.UncommittedBundle createBundle = this.factory.createBundle(this.transformed);
        byte[] bArr = {4, 8, 12};
        WindowedValue of = WindowedValue.of(bArr, new Instant(891L), new IntervalWindow(new Instant(0L), new Instant(1000L)), PaneInfo.ON_TIME_AND_ONLY_FIRING);
        bArr[2] = -3;
        createBundle.add(of);
        Assert.assertThat(createBundle.commit(Instant.now()).getElements(), Matchers.containsInAnyOrder(new WindowedValue[]{of}));
    }

    @Test
    public void mutationAfterAddKeyedBundleThrows() {
        DirectRunner.UncommittedBundle createKeyedBundle = this.factory.createKeyedBundle(StructuralKey.of("mykey", StringUtf8Coder.of()), this.transformed);
        byte[] bArr = {4, 8, 12};
        createKeyedBundle.add(WindowedValue.of(bArr, new Instant(891L), new IntervalWindow(new Instant(0L), new Instant(1000L)), PaneInfo.ON_TIME_AND_ONLY_FIRING));
        bArr[0] = Byte.MAX_VALUE;
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("Values must not be mutated in any way after being output");
        createKeyedBundle.commit(Instant.now());
    }

    @Test
    public void mutationAfterAddCreateBundleThrows() {
        DirectRunner.UncommittedBundle createBundle = this.factory.createBundle(this.transformed);
        byte[] bArr = {4, 8, 12};
        createBundle.add(WindowedValue.of(bArr, new Instant(891L), new IntervalWindow(new Instant(0L), new Instant(1000L)), PaneInfo.ON_TIME_AND_ONLY_FIRING));
        bArr[2] = -3;
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("Values must not be mutated in any way after being output");
        createBundle.commit(Instant.now());
    }
}
