package org.apache.beam.runners.direct;

import com.fasterxml.jackson.annotation.JsonValue;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest.class */
public class DirectRunnerTest implements Serializable {

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    /* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest$ObjectPipelineOptions.class */
    public interface ObjectPipelineOptions extends PipelineOptions {
        Object getValue();

        void setValue(Object obj);
    }

    private Pipeline getPipeline() {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(DirectRunner.class);
        return Pipeline.create(create);
    }

    @Test
    public void defaultRunnerLoaded() {
        Assert.assertThat(DirectRunner.class, Matchers.equalTo(PipelineOptionsFactory.create().getRunner()));
    }

    @Test
    public void wordCountShouldSucceed() throws Throwable {
        Pipeline pipeline = getPipeline();
        PAssert.that(pipeline.apply(Create.of(new String[]{"foo", "bar", "foo", "baz", "bar", "foo"})).apply(MapElements.via(new SimpleFunction<String, String>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.1
            public String apply(String str) {
                return str;
            }
        })).apply(Count.perElement()).apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.2
            public String apply(KV<String, Long> kv) {
                return String.format("%s: %s", kv.getKey(), kv.getValue());
            }
        }))).containsInAnyOrder(new String[]{"baz: 1", "bar: 2", "foo: 3"});
        pipeline.run().awaitCompletion();
    }

    @Test(timeout = 5000)
    public void byteArrayCountShouldSucceed() {
        Pipeline pipeline = getPipeline();
        SerializableFunction<Integer, byte[]> serializableFunction = new SerializableFunction<Integer, byte[]>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.3
            public byte[] apply(Integer num) {
                try {
                    return CoderUtils.encodeToByteArray(VarIntCoder.of(), num);
                } catch (CoderException e) {
                    Assert.fail("Unexpected Coder Exception " + e);
                    throw new AssertionError("Unreachable");
                }
            }
        };
        TypeDescriptor<byte[]> typeDescriptor = new TypeDescriptor<byte[]>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.4
        };
        PAssert.thatMap(PCollectionList.of(pipeline.apply(Create.of(new Integer[]{1, 1, 1, 2, 2, 3})).apply(MapElements.via(serializableFunction).withOutputType(typeDescriptor))).and(pipeline.apply(Create.of(new Integer[]{1, -2, -8, -16})).apply(MapElements.via(serializableFunction).withOutputType(typeDescriptor))).apply(Flatten.pCollections()).apply(Count.perElement()).apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.5
            public KV<Integer, Long> apply(KV<byte[], Long> kv) {
                try {
                    return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), (byte[]) kv.getKey()), kv.getValue());
                } catch (CoderException e) {
                    Assert.fail("Unexpected Coder Exception " + e);
                    throw new AssertionError("Unreachable");
                }
            }
        }))).isEqualTo(ImmutableMap.builder().put(1, 4L).put(2, 2L).put(3, 1L).put(-2, 1L).put(-8, 1L).put(-16, 1L).build());
    }

    @Test
    public void transformDisplayDataExceptionShouldFail() {
        DoFn<Integer, Integer> doFn = new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.6
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) throws Exception {
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                throw new RuntimeException("oh noes!");
            }
        };
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(new Integer[]{1, 2, 3})).apply(ParDo.of(doFn));
        this.thrown.expectMessage(doFn.getClass().getName());
        this.thrown.expectCause(ThrowableMessageMatcher.hasMessage(Matchers.is("oh noes!")));
        pipeline.run();
    }

    @Test
    public void pipelineOptionsDisplayDataExceptionShouldFail() {
        Object obj = new Object() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.7
            @JsonValue
            public int getValue() {
                return 42;
            }

            public String toString() {
                throw new RuntimeException("oh noes!!");
            }
        };
        Pipeline pipeline = getPipeline();
        ((ObjectPipelineOptions) pipeline.getOptions().as(ObjectPipelineOptions.class)).setValue(obj);
        pipeline.apply(Create.of(new Integer[]{1, 2, 3}));
        this.thrown.expectMessage(PipelineOptions.class.getName());
        this.thrown.expectCause(ThrowableMessageMatcher.hasMessage(Matchers.is("oh noes!!")));
        pipeline.run();
    }

    @Test
    public void testMutatingOutputThenOutputDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(new Integer[]{42})).apply(ParDo.of(new DoFn<Integer, List<Integer>>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.8
            public void processElement(DoFn<Integer, List<Integer>>.ProcessContext processContext) {
                List asList = Arrays.asList(1, 2, 3, 4);
                processContext.output(asList);
                asList.set(0, 37);
                processContext.output(asList);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("output");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    @Test
    public void testMutatingOutputThenTerminateDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(new Integer[]{42})).apply(ParDo.of(new DoFn<Integer, List<Integer>>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.9
            public void processElement(DoFn<Integer, List<Integer>>.ProcessContext processContext) {
                List asList = Arrays.asList(1, 2, 3, 4);
                processContext.output(asList);
                asList.set(0, 37);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("output");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    @Test
    public void testMutatingOutputCoderDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(new Integer[]{42})).apply(ParDo.of(new DoFn<Integer, byte[]>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.10
            public void processElement(DoFn<Integer, byte[]>.ProcessContext processContext) {
                byte[] bArr = {1, 2, 3};
                processContext.output(bArr);
                bArr[0] = 10;
                processContext.output(bArr);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("output");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    @Test
    public void testMutatingInputDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(new List[]{Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)}).withCoder(ListCoder.of(VarIntCoder.of()))).apply(ParDo.of(new DoFn<List<Integer>, Integer>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.11
            public void processElement(DoFn<List<Integer>, Integer>.ProcessContext processContext) {
                ((List) processContext.element()).set(0, 37);
                processContext.output(12);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("Input");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    /* JADX WARN: Type inference failed for: r1v1, types: [byte[], java.lang.Object[]] */
    @Test
    public void testMutatingInputCoderDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of((Object[]) new byte[]{new byte[]{1, 2, 3}, new byte[]{4, 5, 6}})).apply(ParDo.of(new DoFn<byte[], Integer>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.12
            public void processElement(DoFn<byte[], Integer>.ProcessContext processContext) {
                ((byte[]) processContext.element())[0] = 10;
                processContext.output(13);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("Input");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }
}
