package org.apache.beam.runners.direct;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.direct_java.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest.class */
public class DirectRunnerTest implements Serializable {

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();
    private static AtomicInteger changed;
    private static final AtomicLong TEARDOWN_CALL = new AtomicLong(-1);

    /* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest$LongNoDecodeCoder.class */
    private static class LongNoDecodeCoder extends AtomicCoder<Long> {
        private LongNoDecodeCoder() {
        }

        public void encode(Long l, OutputStream outputStream) throws IOException {
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Long m198decode(InputStream inputStream) throws IOException {
            throw new CoderException("Cannot decode a long");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest$MustSplitSource.class */
    private static class MustSplitSource<T> extends BoundedSource<T> {
        private final BoundedSource<T> underlying;

        public static <T> BoundedSource<T> of(BoundedSource<T> boundedSource) {
            return new MustSplitSource(boundedSource);
        }

        public MustSplitSource(BoundedSource<T> boundedSource) {
            this.underlying = boundedSource;
        }

        public List<? extends BoundedSource<T>> split(long j, PipelineOptions pipelineOptions) throws Exception {
            Preconditions.checkState(j < getEstimatedSizeBytes(pipelineOptions), "Must split into more than one source");
            return this.underlying.split(j, pipelineOptions);
        }

        public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
            return this.underlying.getEstimatedSizeBytes(pipelineOptions);
        }

        public BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) throws IOException {
            throw new IllegalStateException("The MustSplitSource cannot create a reader without being split first");
        }

        public void validate() {
            this.underlying.validate();
        }

        public Coder<T> getOutputCoder() {
            return this.underlying.getOutputCoder();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest$StaticQueue.class */
    public static class StaticQueue<T> implements Serializable {
        static final Map<String, StaticQueue<?>> QUEUES = new ConcurrentHashMap();
        private final String name;
        private final Coder<T> coder;
        private final transient BlockingQueue<Optional<T>> queue = new ArrayBlockingQueue(10);

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest$StaticQueue$StaticQueueSource.class */
        public static class StaticQueueSource<T> extends UnboundedSource<T, Checkpoint<T>> {
            final StaticQueue<T> queue;

            /* JADX INFO: Access modifiers changed from: package-private */
            /* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest$StaticQueue$StaticQueueSource$Checkpoint.class */
            public static class Checkpoint<T> implements UnboundedSource.CheckpointMark, Serializable {
                final T read;

                Checkpoint(T t) {
                    this.read = t;
                }

                public void finalizeCheckpoint() throws IOException {
                }
            }

            StaticQueueSource(StaticQueue<T> staticQueue) {
                this.queue = staticQueue;
            }

            public List<? extends UnboundedSource<T, Checkpoint<T>>> split(int i, PipelineOptions pipelineOptions) throws Exception {
                return Arrays.asList(this);
            }

            public UnboundedSource.UnboundedReader<T> createReader(PipelineOptions pipelineOptions, final Checkpoint<T> checkpoint) {
                return new UnboundedSource.UnboundedReader<T>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.StaticQueue.StaticQueueSource.1
                    T read;
                    boolean finished;

                    {
                        this.read = checkpoint == null ? null : checkpoint.read;
                        this.finished = false;
                    }

                    public boolean start() throws IOException {
                        return advance();
                    }

                    public boolean advance() throws IOException {
                        try {
                            Optional<T> take = StaticQueueSource.this.queue.take();
                            if (take.isPresent()) {
                                this.read = take.get();
                                return true;
                            }
                            this.finished = true;
                            return false;
                        } catch (InterruptedException e) {
                            throw new IOException(e);
                        }
                    }

                    public Instant getWatermark() {
                        return this.finished ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
                    }

                    public UnboundedSource.CheckpointMark getCheckpointMark() {
                        return new Checkpoint(this.read);
                    }

                    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
                    public UnboundedSource<T, ?> m200getCurrentSource() {
                        return StaticQueueSource.this;
                    }

                    public T getCurrent() throws NoSuchElementException {
                        return this.read;
                    }

                    public Instant getCurrentTimestamp() {
                        return getWatermark();
                    }

                    public void close() throws IOException {
                    }
                };
            }

            public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
                return SerializableCoder.of(Checkpoint.class);
            }

            public Coder<T> getOutputCoder() {
                return ((StaticQueue) this.queue).coder;
            }
        }

        static <T> StaticQueue<T> of(String str, Coder<T> coder) {
            return new StaticQueue<>(str, coder);
        }

        StaticQueue(String str, Coder<T> coder) {
            this.name = str;
            this.coder = coder;
            Preconditions.checkState(QUEUES.put(str, this) == null, "Queue " + str + " already exists.");
        }

        StaticQueue<T> add(T t) {
            this.queue.add(Optional.of(t));
            return this;
        }

        @Nullable
        Optional<T> take() throws InterruptedException {
            return this.queue.take();
        }

        PTransform<PBegin, PCollection<T>> read() {
            return new PTransform<PBegin, PCollection<T>>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.StaticQueue.1
                public PCollection<T> expand(PBegin pBegin) {
                    return pBegin.apply("readFrom:" + this.name, Read.from(StaticQueue.this.asSource()));
                }
            };
        }

        UnboundedSource<T, ?> asSource() {
            return new StaticQueueSource(this);
        }

        void terminate() {
            this.queue.add(Optional.empty());
        }

        private Object readResolve() {
            return QUEUES.get(this.name);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/DirectRunnerTest$TestSerializationOfOptions.class */
    public interface TestSerializationOfOptions extends PipelineOptions {
        String getFoo();

        void setFoo(String str);

        @Default.String("not overridden")
        @JsonIgnore
        String getIgnoredField();

        void setIgnoredField(String str);
    }

    private Pipeline getPipeline() {
        return getPipeline(true);
    }

    private Pipeline getPipeline(boolean z) {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(DirectRunner.class);
        create.as(DirectOptions.class).setBlockOnRun(z);
        return Pipeline.create(create);
    }

    @Test
    public void defaultRunnerLoaded() {
        Assert.assertThat(DirectRunner.class, Matchers.equalTo(PipelineOptionsFactory.create().getRunner()));
    }

    @Test
    public void wordCountShouldSucceed() throws Throwable {
        Pipeline pipeline = getPipeline();
        PAssert.that(pipeline.apply(Create.of("foo", new String[]{"bar", "foo", "baz", "bar", "foo"})).apply(MapElements.via(new SimpleFunction<String, String>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.1
            public String apply(String str) {
                return str;
            }
        })).apply(Count.perElement()).apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.2
            public String apply(KV<String, Long> kv) {
                return String.format("%s: %s", kv.getKey(), kv.getValue());
            }
        }))).containsInAnyOrder(new String[]{"baz: 1", "bar: 2", "foo: 3"});
        pipeline.run().waitUntilFinish();
    }

    @Test
    public void reusePipelineSucceeds() throws Throwable {
        Pipeline pipeline = getPipeline();
        changed = new AtomicInteger(0);
        PCollection apply = pipeline.apply(Create.of("foo", new String[]{"bar", "foo", "baz", "bar", "foo"})).apply(MapElements.via(new SimpleFunction<String, String>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.3
            public String apply(String str) {
                return str;
            }
        })).apply(Count.perElement());
        PCollection apply2 = apply.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.4
            public String apply(KV<String, Long> kv) {
                return String.format("%s: %s", kv.getKey(), kv.getValue());
            }
        }));
        apply.apply(ParDo.of(new DoFn<KV<String, Long>, Void>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.5
            @DoFn.ProcessElement
            public void updateChanged(DoFn<KV<String, Long>, Void>.ProcessContext processContext) {
                DirectRunnerTest.changed.getAndIncrement();
            }
        }));
        PAssert.that(apply2).containsInAnyOrder(new String[]{"baz: 1", "bar: 2", "foo: 3"});
        pipeline.run().waitUntilFinish();
        pipeline.run().waitUntilFinish();
        Assert.assertThat("Each element should have been processed twice", Integer.valueOf(changed.get()), Matchers.equalTo(6));
    }

    @Test
    public void byteArrayCountShouldSucceed() {
        Pipeline pipeline = getPipeline();
        SerializableFunction serializableFunction = num -> {
            try {
                return CoderUtils.encodeToByteArray(VarIntCoder.of(), num);
            } catch (CoderException e) {
                Assert.fail("Unexpected Coder Exception " + e);
                throw new AssertionError("Unreachable");
            }
        };
        TypeDescriptor<byte[]> typeDescriptor = new TypeDescriptor<byte[]>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.6
        };
        PAssert.thatMap(PCollectionList.of(pipeline.apply(Create.of(1, new Integer[]{1, 1, 2, 2, 3})).apply(MapElements.into(typeDescriptor).via(serializableFunction))).and(pipeline.apply(Create.of(1, new Integer[]{-2, -8, -16})).apply(MapElements.into(typeDescriptor).via(serializableFunction))).apply(Flatten.pCollections()).apply(Count.perElement()).apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.7
            public KV<Integer, Long> apply(KV<byte[], Long> kv) {
                try {
                    return KV.of((Integer) CoderUtils.decodeFromByteArray(VarIntCoder.of(), (byte[]) kv.getKey()), (Long) kv.getValue());
                } catch (CoderException e) {
                    Assert.fail("Unexpected Coder Exception " + e);
                    throw new AssertionError("Unreachable");
                }
            }
        }))).isEqualTo(ImmutableMap.builder().put(1, 4L).put(2, 2L).put(3, 1L).put(-2, 1L).put(-8, 1L).put(-16, 1L).build());
    }

    @Test
    public void splitsInputs() {
        Pipeline pipeline = getPipeline();
        PAssert.that(pipeline.apply(Read.from(MustSplitSource.of(CountingSource.upTo(3L))))).containsInAnyOrder(new Long[]{0L, 1L, 2L});
        pipeline.run();
    }

    @Test
    public void cancelShouldStopPipeline() throws Exception {
        PipelineOptions testingPipelineOptions = TestPipeline.testingPipelineOptions();
        testingPipelineOptions.as(DirectOptions.class).setBlockOnRun(false);
        testingPipelineOptions.setRunner(DirectRunner.class);
        Pipeline create = Pipeline.create(testingPipelineOptions);
        create.apply(GenerateSequence.from(0L).withRate(1L, Duration.standardSeconds(1L)));
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        Runnable runnable = () -> {
            try {
                ((PipelineResult) arrayBlockingQueue.take()).cancel();
            } catch (IOException e) {
                throw new IllegalStateException(e);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e2);
            }
        };
        Callable callable = () -> {
            PipelineResult run = create.run();
            try {
                arrayBlockingQueue.put(run);
                return run;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        };
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        Future<?> submit = newCachedThreadPool.submit(runnable);
        Future submit2 = newCachedThreadPool.submit(callable);
        submit.get();
        ((PipelineResult) submit2.get()).waitUntilFinish();
    }

    @Test
    public void testWaitUntilFinishTimeout() throws Exception {
        DirectOptions as = PipelineOptionsFactory.as(DirectOptions.class);
        as.setBlockOnRun(false);
        as.setRunner(DirectRunner.class);
        Pipeline create = Pipeline.create(as);
        create.apply(Create.of(1L, new Long[0])).apply(ParDo.of(new DoFn<Long, Long>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.8
            @DoFn.ProcessElement
            public void hang(DoFn<Long, Long>.ProcessContext processContext) throws InterruptedException {
                Thread.sleep(Long.MAX_VALUE);
            }
        }));
        PipelineResult run = create.run();
        Assert.assertThat(run.getState(), Matchers.is(PipelineResult.State.RUNNING));
        run.waitUntilFinish(Duration.millis(1L));
        Assert.assertThat(run.getState(), Matchers.is(PipelineResult.State.RUNNING));
    }

    @Test
    public void tearsDownFnsBeforeFinishing() {
        TEARDOWN_CALL.set(-1L);
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of("a", new String[0])).apply(ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.9
            @DoFn.ProcessElement
            public void onElement(DoFn<String, String>.ProcessContext processContext) {
            }

            @DoFn.Teardown
            public void teardown() {
                try {
                    Thread.sleep(1000L);
                    DirectRunnerTest.TEARDOWN_CALL.set(System.nanoTime());
                } catch (InterruptedException e) {
                    throw new AssertionError(e);
                }
            }
        }));
        pipeline.run().waitUntilFinish();
        long nanoTime = System.nanoTime();
        long j = TEARDOWN_CALL.get();
        Assert.assertThat(Long.valueOf(j), Matchers.greaterThan(0L));
        Assert.assertThat(Long.valueOf(nanoTime), Matchers.greaterThan(Long.valueOf(j)));
    }

    @Test
    public void transformDisplayDataExceptionShouldFail() {
        DoFn<Integer, Integer> doFn = new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.10
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) throws Exception {
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                throw new RuntimeException("oh noes!");
            }
        };
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(1, new Integer[]{2, 3})).apply(ParDo.of(doFn));
        this.thrown.expectMessage(doFn.getClass().getName());
        this.thrown.expectCause(ThrowableMessageMatcher.hasMessage(Matchers.is("oh noes!")));
        pipeline.run();
    }

    @Test
    public void testMutatingOutputThenOutputDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(42, new Integer[0])).apply(ParDo.of(new DoFn<Integer, List<Integer>>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.11
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, List<Integer>>.ProcessContext processContext) {
                List asList = Arrays.asList(1, 2, 3, 4);
                processContext.output(asList);
                asList.set(0, 37);
                processContext.output(asList);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("output");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    @Test
    public void testMutatingOutputWithEnforcementDisabledSucceeds() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(DirectRunner.class);
        create.as(DirectOptions.class).setEnforceImmutability(false);
        Pipeline create2 = Pipeline.create(create);
        create2.apply(Create.of(42, new Integer[0])).apply(ParDo.of(new DoFn<Integer, List<Integer>>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.12
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, List<Integer>>.ProcessContext processContext) {
                List asList = Arrays.asList(1, 2, 3, 4);
                processContext.output(asList);
                asList.set(0, 37);
                processContext.output(asList);
            }
        }));
        create2.run();
    }

    @Test
    public void testMutatingOutputThenTerminateDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(42, new Integer[0])).apply(ParDo.of(new DoFn<Integer, List<Integer>>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.13
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, List<Integer>>.ProcessContext processContext) {
                List asList = Arrays.asList(1, 2, 3, 4);
                processContext.output(asList);
                asList.set(0, 37);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("output");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    @Test
    public void testMutatingOutputCoderDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(42, new Integer[0])).apply(ParDo.of(new DoFn<Integer, byte[]>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.14
            @DoFn.ProcessElement
            public void processElement(DoFn<Integer, byte[]>.ProcessContext processContext) {
                byte[] bArr = {1, 2, 3};
                processContext.output(bArr);
                bArr[0] = 10;
                processContext.output(bArr);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("output");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    @Test
    public void testMutatingInputDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(Arrays.asList(1, 2, 3), new List[]{Arrays.asList(4, 5, 6)}).withCoder(ListCoder.of(VarIntCoder.of()))).apply(ParDo.of(new DoFn<List<Integer>, Integer>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.15
            @DoFn.ProcessElement
            public void processElement(DoFn<List<Integer>, Integer>.ProcessContext processContext) {
                ((List) processContext.element()).set(0, 37);
                processContext.output(12);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("Input");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    /* JADX WARN: Type inference failed for: r2v4, types: [java.lang.Object[], byte[]] */
    @Test
    public void testMutatingInputCoderDoFnError() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of(new byte[]{1, 2, 3}, (Object[]) new byte[]{new byte[]{4, 5, 6}})).apply(ParDo.of(new DoFn<byte[], Integer>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.16
            @DoFn.ProcessElement
            public void processElement(DoFn<byte[], Integer>.ProcessContext processContext) {
                ((byte[]) processContext.element())[0] = 10;
                processContext.output(13);
            }
        }));
        this.thrown.expect(IllegalMutationException.class);
        this.thrown.expectMessage("Input");
        this.thrown.expectMessage("must not be mutated");
        pipeline.run();
    }

    @Test
    public void testUnencodableOutputElement() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(Create.of((Void) null, new Void[0])).apply(ParDo.of(new DoFn<Void, Long>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.17
            @DoFn.ProcessElement
            public void processElement(DoFn<Void, Long>.ProcessContext processContext) {
                processContext.output((Object) null);
            }
        })).setCoder(VarLongCoder.of()).apply(ParDo.of(new DoFn<Long, Long>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.18
            @DoFn.ProcessElement
            public void unreachable(DoFn<Long, Long>.ProcessContext processContext) {
                Assert.fail("Pipeline should fail to encode a null Long in VarLongCoder");
            }
        }));
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("cannot encode a null Long");
        pipeline.run();
    }

    @Test
    public void testUnencodableOutputFromBoundedRead() throws Exception {
        Pipeline pipeline = getPipeline();
        pipeline.apply(GenerateSequence.from(0L).to(10L)).setCoder(new LongNoDecodeCoder());
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("Cannot decode a long");
        pipeline.run();
    }

    @Test
    public void testUnencodableOutputFromUnboundedRead() {
        Pipeline pipeline = getPipeline();
        pipeline.apply(GenerateSequence.from(0L)).setCoder(new LongNoDecodeCoder());
        this.thrown.expectCause(Matchers.isA(CoderException.class));
        this.thrown.expectMessage("Cannot decode a long");
        pipeline.run();
    }

    @Test
    public void testFromOptionsIfIgnoredFieldsGettingDropped() {
        TestSerializationOfOptions testSerializationOfOptions = (TestSerializationOfOptions) PipelineOptionsFactory.fromArgs(new String[]{"--foo=testValue", "--ignoredField=overridden", "--runner=DirectRunner"}).as(TestSerializationOfOptions.class);
        Assert.assertEquals("testValue", testSerializationOfOptions.getFoo());
        Assert.assertEquals("overridden", testSerializationOfOptions.getIgnoredField());
        Pipeline create = Pipeline.create(testSerializationOfOptions);
        PAssert.that(create.apply(Create.of("1", new String[0])).apply(ParDo.of(new DoFn<String, Integer>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.19
            @DoFn.ProcessElement
            public void processElement(DoFn<String, Integer>.ProcessContext processContext) {
                TestSerializationOfOptions testSerializationOfOptions2 = (TestSerializationOfOptions) processContext.getPipelineOptions().as(TestSerializationOfOptions.class);
                Assert.assertEquals("testValue", testSerializationOfOptions2.getFoo());
                Assert.assertEquals("not overridden", testSerializationOfOptions2.getIgnoredField());
                processContext.output(Integer.valueOf(Integer.parseInt((String) processContext.element())));
            }
        }))).containsInAnyOrder(new Integer[]{1});
        create.run();
    }

    @Test(timeout = 10000)
    public void testTwoPOutputsInPipelineWithCascade() throws InterruptedException {
        StaticQueue<Integer> of = StaticQueue.of(ExecutionStateTracker.START_STATE_NAME, VarIntCoder.of());
        StaticQueue of2 = StaticQueue.of("messages", VarIntCoder.of());
        Pipeline pipeline = getPipeline(false);
        pipeline.begin().apply("outputStartSignal", outputStartTo(of));
        PAssert.that(pipeline.apply("processMessages", of2.read()).apply(Window.into(new GlobalWindows()).triggering(AfterWatermark.pastEndOfWindow()).discardingFiredPanes().withAllowedLateness(Duration.ZERO)).apply(Sum.integersGlobally())).containsInAnyOrder(new Integer[]{6});
        PipelineResult run = pipeline.run();
        do {
        } while (of.take() == null);
        of2.add(1).add(2).add(3).terminate();
        run.waitUntilFinish();
    }

    private PTransform<PBegin, PDone> outputStartTo(final StaticQueue<Integer> staticQueue) {
        return new PTransform<PBegin, PDone>() { // from class: org.apache.beam.runners.direct.DirectRunnerTest.20
            public PDone expand(PBegin pBegin) {
                PCollection apply = pBegin.apply(Create.of(1, new Integer[0]));
                MapElements into = MapElements.into(TypeDescriptors.voids());
                StaticQueue staticQueue2 = staticQueue;
                apply.apply(into.via(num -> {
                    staticQueue2.add(num);
                    return null;
                }));
                return PDone.in(pBegin.getPipeline());
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                String implMethodName = serializedLambda.getImplMethodName();
                boolean z = -1;
                switch (implMethodName.hashCode()) {
                    case 921197843:
                        if (implMethodName.equals("lambda$expand$dc8992$1")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/direct/DirectRunnerTest$20") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/runners/direct/DirectRunnerTest$StaticQueue;Ljava/lang/Integer;)Ljava/lang/Void;")) {
                            StaticQueue staticQueue2 = (StaticQueue) serializedLambda.getCapturedArg(0);
                            return num -> {
                                staticQueue2.add(num);
                                return null;
                            };
                        }
                        break;
                }
                throw new IllegalArgumentException("Invalid lambda deserialization");
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 776300730:
                if (implMethodName.equals("lambda$byteArrayCountShouldSucceed$48f361c4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/direct/DirectRunnerTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)[B")) {
                    return num -> {
                        try {
                            return CoderUtils.encodeToByteArray(VarIntCoder.of(), num);
                        } catch (CoderException e) {
                            Assert.fail("Unexpected Coder Exception " + e);
                            throw new AssertionError("Unreachable");
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
