package org.apache.beam.runners.core.construction;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.runners.core.construction.expansion.ExpansionService;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesCrossLanguageTransforms;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ConnectivityState;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ManagedChannelBuilder;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/construction/ExternalTest.class */
public class ExternalTest implements Serializable {

    @Rule
    public transient TestPipeline testPipeline = TestPipeline.create();
    private static final String TEST_URN_SIMPLE = "simple";
    private static final String TEST_URN_LE = "le";
    private static final String TEST_URN_MULTI = "multi";
    private static String pythonServerCommand;
    private static Integer expansionPort;
    private static String localExpansionAddr;
    private static Server localExpansionServer;

    @AutoService(ExpansionService.ExpansionServiceRegistrar.class)
    /* loaded from: input_file:org/apache/beam/runners/core/construction/ExternalTest$TestTransforms.class */
    public static class TestTransforms implements ExpansionService.ExpansionServiceRegistrar, Serializable {
        private final TupleTag<Integer> even = new TupleTag<Integer>("even") { // from class: org.apache.beam.runners.core.construction.ExternalTest.TestTransforms.1
        };
        private final TupleTag<Integer> odd = new TupleTag<Integer>("odd") { // from class: org.apache.beam.runners.core.construction.ExternalTest.TestTransforms.2
        };

        public Map<String, ExpansionService.TransformProvider> knownTransforms() {
            return ImmutableMap.of(ExternalTest.TEST_URN_SIMPLE, functionSpec -> {
                return MapElements.into(TypeDescriptors.integers()).via(num -> {
                    return Integer.valueOf(num.intValue() + 1);
                });
            }, ExternalTest.TEST_URN_LE, functionSpec2 -> {
                return Filter.lessThanEq(Integer.valueOf(Integer.parseInt(functionSpec2.getPayload().toStringUtf8())));
            }, ExternalTest.TEST_URN_MULTI, functionSpec3 -> {
                return ParDo.of(new DoFn<Integer, Integer>() { // from class: org.apache.beam.runners.core.construction.ExternalTest.TestTransforms.3
                    @DoFn.ProcessElement
                    public void processElement(DoFn<Integer, Integer>.ProcessContext processContext) {
                        if (((Integer) processContext.element()).intValue() % 2 == 0) {
                            processContext.output((Integer) processContext.element());
                        } else {
                            processContext.output(TestTransforms.this.odd, (Integer) processContext.element());
                        }
                    }
                }).withOutputTags(this.even, TupleTagList.of(this.odd));
            });
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -841340103:
                    if (implMethodName.equals("lambda$knownTransforms$79eb4a9b$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/core/construction/ExternalTest$TestTransforms") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                        return num -> {
                            return Integer.valueOf(num.intValue() + 1);
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @BeforeClass
    public static void setUp() throws IOException {
        pythonServerCommand = System.getProperty("pythonTestExpansionCommand");
        expansionPort = Integer.valueOf(System.getProperty("expansionPort"));
        int intValue = expansionPort.intValue() + 100;
        localExpansionAddr = String.format("localhost:%s", Integer.valueOf(intValue));
        localExpansionServer = ServerBuilder.forPort(intValue).addService(new ExpansionService()).build();
        localExpansionServer.start();
    }

    @AfterClass
    public static void tearDown() {
        localExpansionServer.shutdownNow();
    }

    @Test
    @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
    public void expandSingleTest() {
        PAssert.that(this.testPipeline.apply(Create.of(1, new Integer[]{2, 3})).apply(External.of(TEST_URN_SIMPLE, new byte[0], localExpansionAddr))).containsInAnyOrder(new Integer[]{2, 3, 4});
        this.testPipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
    public void expandMultipleTest() {
        PAssert.that(this.testPipeline.apply(Create.of(1, new Integer[]{2, 3})).apply("add one", External.of(TEST_URN_SIMPLE, new byte[0], localExpansionAddr)).apply("filter <=3", External.of(TEST_URN_LE, "3".getBytes(StandardCharsets.UTF_8), localExpansionAddr))).containsInAnyOrder(new Integer[]{2, 3});
        this.testPipeline.run();
    }

    @Test
    @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
    public void expandMultiOutputTest() {
        PCollectionTuple apply = this.testPipeline.apply(Create.of(1, new Integer[]{2, 3, 4, 5, 6})).apply(External.of(TEST_URN_MULTI, new byte[0], localExpansionAddr).withMultiOutputs());
        PAssert.that(apply.get(new TupleTag<Integer>("even") { // from class: org.apache.beam.runners.core.construction.ExternalTest.1
        })).containsInAnyOrder(new Integer[]{2, 4, 6});
        PAssert.that(apply.get(new TupleTag<Integer>("odd") { // from class: org.apache.beam.runners.core.construction.ExternalTest.2
        })).containsInAnyOrder(new Integer[]{1, 3, 5});
        this.testPipeline.run();
    }

    private Process runCommandline(String str) {
        try {
            return new ProcessBuilder("sh", "-c", str).start();
        } catch (IOException e) {
            throw new AssertionError("process launch failed.");
        }
    }

    @Test
    @Category({ValidatesRunner.class, UsesCrossLanguageTransforms.class})
    public void expandPythonTest() {
        String format = String.format("localhost:%s", expansionPort);
        Process runCommandline = runCommandline(String.format("%s -p %s", pythonServerCommand, expansionPort));
        try {
            try {
                ManagedChannel build = ManagedChannelBuilder.forTarget(format).build();
                ConnectivityState state = build.getState(true);
                for (int i = 0; i < 30 && state != ConnectivityState.READY; i++) {
                    Thread.sleep(500L);
                    state = build.getState(true);
                }
                build.shutdownNow();
                PAssert.that(this.testPipeline.apply(Create.of("1", new String[]{"2", "2", "3", "3", "3"})).apply("toBytes", MapElements.into(new TypeDescriptor<byte[]>() { // from class: org.apache.beam.runners.core.construction.ExternalTest.3
                }).via((v0) -> {
                    return v0.getBytes();
                })).apply(External.of("count_per_element_bytes", new byte[0], format)).apply("toString", MapElements.into(TypeDescriptors.strings()).via(String::new))).containsInAnyOrder(new String[]{"1->1", "2->2", "3->3"});
                this.testPipeline.run();
                runCommandline.destroyForcibly();
            } catch (InterruptedException e) {
                throw new RuntimeException("interrupted.");
            }
        } catch (Throwable th) {
            runCommandline.destroyForcibly();
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
            case 1950049973:
                if (implMethodName.equals("getBytes")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/String") && serializedLambda.getImplMethodSignature().equals("([B)V")) {
                    return String::new;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/String") && serializedLambda.getImplMethodSignature().equals("()[B")) {
                    return (v0) -> {
                        return v0.getBytes();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
