package org.apache.beam.runners.flink.adapter;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest.class */
public class BeamFlinkDataSetAdapterTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest$MyCoder.class */
    public static class MyCoder extends Coder<String> {
        private static final int CUSTOM_MARKER = 3;
        static final /* synthetic */ boolean $assertionsDisabled;

        private MyCoder() {
        }

        public void encode(String str, OutputStream outputStream) throws IOException {
            outputStream.write(CUSTOM_MARKER);
            StringUtf8Coder.of().encode(str, outputStream);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public String m15decode(InputStream inputStream) throws IOException {
            if ($assertionsDisabled || inputStream.read() == CUSTOM_MARKER) {
                return StringUtf8Coder.of().decode(inputStream);
            }
            throw new AssertionError();
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return null;
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }

        static {
            $assertionsDisabled = !BeamFlinkDataSetAdapterTest.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PTransform<PCollection<? extends String>, PCollection<String>> withPrefix(final String str) {
        return ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataSetAdapterTest.1
            @DoFn.ProcessElement
            public void processElement(@DoFn.Element String str2, DoFn.OutputReceiver<String> outputReceiver) {
                outputReceiver.output(str + str2);
            }
        });
    }

    @Test
    public void testApplySimpleTransform() throws Exception {
        MatcherAssert.assertThat(new BeamFlinkDataSetAdapter().applyBeamPTransform(ExecutionEnvironment.createCollectionsEnvironment().fromCollection(ImmutableList.of("a", "b", "c")), withPrefix("x")).collect(), Matchers.containsInAnyOrder(new String[]{"xa", "xb", "xc"}));
    }

    @Test
    public void testApplyCompositeTransform() throws Exception {
        MatcherAssert.assertThat(new BeamFlinkDataSetAdapter().applyBeamPTransform(ExecutionEnvironment.createCollectionsEnvironment().fromCollection(ImmutableList.of("a", "b", "c")), new PTransform<PCollection<String>, PCollection<String>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataSetAdapterTest.2
            public PCollection<String> expand(PCollection<String> pCollection) {
                return pCollection.apply(BeamFlinkDataSetAdapterTest.withPrefix("x")).apply(BeamFlinkDataSetAdapterTest.withPrefix("y"));
            }
        }).collect(), Matchers.containsInAnyOrder(new String[]{"yxa", "yxb", "yxc"}));
    }

    @Test
    public void testApplyMultiInputTransform() throws Exception {
        CollectionEnvironment createCollectionsEnvironment = ExecutionEnvironment.createCollectionsEnvironment();
        MatcherAssert.assertThat(new BeamFlinkDataSetAdapter().applyBeamPTransform(ImmutableMap.of("x", createCollectionsEnvironment.fromCollection(ImmutableList.of("a", "b", "c")), "y", createCollectionsEnvironment.fromCollection(ImmutableList.of("d", "e", "f"))), new PTransform<PCollectionTuple, PCollection<String>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataSetAdapterTest.3
            public PCollection<String> expand(PCollectionTuple pCollectionTuple) {
                return PCollectionList.of(pCollectionTuple.get("x").apply(BeamFlinkDataSetAdapterTest.withPrefix("x"))).and(pCollectionTuple.get("y").apply(BeamFlinkDataSetAdapterTest.withPrefix("y"))).apply(Flatten.pCollections());
            }
        }).collect(), Matchers.containsInAnyOrder(new String[]{"xa", "xb", "xc", "yd", "ye", "yf"}));
    }

    @Test
    public void testApplyMultiOutputTransform() throws Exception {
        Map applyMultiOutputBeamPTransform = new BeamFlinkDataSetAdapter().applyMultiOutputBeamPTransform(ExecutionEnvironment.createCollectionsEnvironment().fromCollection(ImmutableList.of("a", "b", "c")), new PTransform<PCollection<String>, PCollectionTuple>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataSetAdapterTest.4
            public PCollectionTuple expand(PCollection<String> pCollection) {
                return PCollectionTuple.of("x", pCollection.apply(BeamFlinkDataSetAdapterTest.withPrefix("x"))).and("y", pCollection.apply(BeamFlinkDataSetAdapterTest.withPrefix("y")));
            }
        });
        MatcherAssert.assertThat(((DataSet) applyMultiOutputBeamPTransform.get("x")).collect(), Matchers.containsInAnyOrder(new Object[]{"xa", "xb", "xc"}));
        MatcherAssert.assertThat(((DataSet) applyMultiOutputBeamPTransform.get("y")).collect(), Matchers.containsInAnyOrder(new Object[]{"ya", "yb", "yc"}));
    }

    @Test
    public void testApplyGroupingTransform() throws Exception {
        MatcherAssert.assertThat(new BeamFlinkDataSetAdapter().applyBeamPTransform(ExecutionEnvironment.createCollectionsEnvironment().fromCollection(ImmutableList.of("a", "a", "b")), Count.perElement()).collect(), Matchers.containsInAnyOrder(new KV[]{KV.of("a", 2L), KV.of("b", 1L)}));
    }

    @Test
    public void testCustomCoder() throws Exception {
        MatcherAssert.assertThat(new BeamFlinkDataSetAdapter().applyBeamPTransform(ExecutionEnvironment.createCollectionsEnvironment().fromCollection(ImmutableList.of("a", "b", "c")), new PTransform<PCollection<String>, PCollection<String>>() { // from class: org.apache.beam.runners.flink.adapter.BeamFlinkDataSetAdapterTest.5
            public PCollection<String> expand(PCollection<String> pCollection) {
                return pCollection.apply(BeamFlinkDataSetAdapterTest.withPrefix("x")).setCoder(new MyCoder());
            }
        }).collect(), Matchers.containsInAnyOrder(new String[]{"xa", "xb", "xc"}));
    }
}
