package org.apache.beam.runners.core.construction;

import com.google.auto.service.AutoService;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/construction/TransformUpgraderTest.class */
public class TransformUpgraderTest {

    @AutoService({TransformPayloadTranslatorRegistrar.class})
    /* loaded from: input_file:org/apache/beam/runners/core/construction/TransformUpgraderTest$Registrar.class */
    public static class Registrar implements TransformPayloadTranslatorRegistrar {
        public Map<Class<TestTransform>, TestTransformPayloadTranslator> getTransformPayloadTranslators() {
            return Collections.singletonMap(TestTransform.class, new TestTransformPayloadTranslator());
        }
    }

    @AutoService({TransformPayloadTranslatorRegistrar.class})
    /* loaded from: input_file:org/apache/beam/runners/core/construction/TransformUpgraderTest$Registrar2.class */
    public static class Registrar2 implements TransformPayloadTranslatorRegistrar {
        public Map<Class<TestTransform2>, TestTransformPayloadTranslator2> getTransformPayloadTranslators() {
            return Collections.singletonMap(TestTransform2.class, new TestTransformPayloadTranslator2());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/construction/TransformUpgraderTest$TestExpansionServiceClientFactory.class */
    static class TestExpansionServiceClientFactory implements ExpansionServiceClientFactory {
        ExpansionApi.ExpansionResponse response;

        TestExpansionServiceClientFactory() {
        }

        public ExpansionServiceClient getExpansionServiceClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
            return new ExpansionServiceClient() { // from class: org.apache.beam.runners.core.construction.TransformUpgraderTest.TestExpansionServiceClientFactory.1
                public ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest expansionRequest) {
                    RunnerApi.Components.Builder builder = expansionRequest.getComponents().toBuilder();
                    RunnerApi.PTransform pTransform = (RunnerApi.PTransform) expansionRequest.getComponents().getTransformsMap().get("TransformUpgraderTest-TestTransform");
                    ByteString empty = ByteString.empty();
                    try {
                        empty = pTransform.getAnnotationsOrThrow("already_upgraded");
                    } catch (Exception e) {
                    }
                    if (!empty.isEmpty()) {
                        pTransform = (RunnerApi.PTransform) expansionRequest.getComponents().getTransformsMap().get("TransformUpgraderTest-TestTransform2");
                    }
                    if (!pTransform.getSpec().getUrn().equals(expansionRequest.getTransform().getSpec().getUrn())) {
                        throw new RuntimeException("Could not find a valid transform to upgrade");
                    }
                    try {
                        Integer num = (Integer) new ObjectInputStream(new ByteArrayInputStream(pTransform.getSpec().getPayload().toByteArray())).readObject();
                        RunnerApi.PTransform.Builder builder2 = pTransform.toBuilder();
                        RunnerApi.FunctionSpec.Builder specBuilder = builder2.getSpecBuilder();
                        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
                        try {
                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteStringOutputStream);
                            objectOutputStream.writeObject(Integer.valueOf(num.intValue() * 2));
                            objectOutputStream.flush();
                            specBuilder.setPayload(byteStringOutputStream.toByteString());
                            builder2.setSpec(specBuilder.build());
                            builder2.putAnnotations("already_upgraded", ByteString.copyFrom("dummyvalue".getBytes(Charset.defaultCharset())));
                            TestExpansionServiceClientFactory.this.response = ExpansionApi.ExpansionResponse.newBuilder().setComponents(builder.build()).setTransform(builder2.build()).build();
                            return TestExpansionServiceClientFactory.this.response;
                        } catch (IOException e2) {
                            throw new RuntimeException(e2);
                        }
                    } catch (Exception e3) {
                        throw new RuntimeException(e3);
                    }
                }

                public ExpansionApi.DiscoverSchemaTransformResponse discover(ExpansionApi.DiscoverSchemaTransformRequest discoverSchemaTransformRequest) {
                    return null;
                }

                public void close() throws Exception {
                }
            };
        }

        public void close() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/core/construction/TransformUpgraderTest$TestTransform.class */
    public static class TestTransform extends PTransform<PCollection<Integer>, PCollection<Integer>> {
        private int testParam;

        public TestTransform(int i) {
            this.testParam = i;
        }

        public PCollection<Integer> expand(PCollection<Integer> pCollection) {
            return pCollection.apply(MapElements.via(new SimpleFunction<Integer, Integer>() { // from class: org.apache.beam.runners.core.construction.TransformUpgraderTest.TestTransform.1
                public Integer apply(Integer num) {
                    return Integer.valueOf(num.intValue() * TestTransform.this.testParam);
                }
            }));
        }

        public Integer getTestParam() {
            return Integer.valueOf(this.testParam);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/construction/TransformUpgraderTest$TestTransform2.class */
    static class TestTransform2 extends TestTransform {
        public TestTransform2(int i) {
            super(i);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/construction/TransformUpgraderTest$TestTransformPayloadTranslator.class */
    static class TestTransformPayloadTranslator implements PTransformTranslation.TransformPayloadTranslator<TestTransform> {
        static final String URN = "beam:transform:test:transform_to_update";
        Schema configRowSchema = Schema.builder().addInt32Field("multiplier").build();

        TestTransformPayloadTranslator() {
        }

        public String getUrn() {
            return URN;
        }

        /* renamed from: fromConfigRow, reason: merged with bridge method [inline-methods] */
        public TestTransform m18fromConfigRow(Row row, PipelineOptions pipelineOptions) {
            return new TestTransform(row.getInt32("multiplier").intValue());
        }

        public Row toConfigRow(TestTransform testTransform) {
            return Row.withSchema(this.configRowSchema).withFieldValue("multiplier", testTransform.getTestParam()).build();
        }

        public RunnerApi.FunctionSpec translate(AppliedPTransform<?, ?, TestTransform> appliedPTransform, SdkComponents sdkComponents) throws IOException {
            int intValue = ((TestTransform) appliedPTransform.getTransform()).getTestParam().intValue();
            RunnerApi.FunctionSpec.Builder newBuilder = RunnerApi.FunctionSpec.newBuilder();
            newBuilder.setUrn(getUrn());
            ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteStringOutputStream);
            objectOutputStream.writeObject(Integer.valueOf(intValue));
            objectOutputStream.flush();
            newBuilder.setPayload(byteStringOutputStream.toByteString());
            return newBuilder.build();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/core/construction/TransformUpgraderTest$TestTransformPayloadTranslator2.class */
    static class TestTransformPayloadTranslator2 extends TestTransformPayloadTranslator {
        static final String URN = "beam:transform:test:transform_to_update2";

        TestTransformPayloadTranslator2() {
        }

        @Override // org.apache.beam.runners.core.construction.TransformUpgraderTest.TestTransformPayloadTranslator
        public String getUrn() {
            return URN;
        }
    }

    private void validateTestParam(RunnerApi.PTransform pTransform, Integer num) {
        try {
            Assert.assertEquals(Integer.valueOf(num.intValue()), (Integer) new ObjectInputStream(new ByteArrayInputStream(pTransform.getSpec().getPayload().toByteArray())).readObject());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testTransformUpgrade() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply(Create.of(1, new Integer[]{2, 3})).apply(new TestTransform(2)).apply(ToString.elements()).apply(TextIO.write().to("dummyfilename"));
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, false);
        ExternalTranslationOptions as = PipelineOptionsFactory.create().as(ExternalTranslationOptions.class);
        ImmutableList of = ImmutableList.of("beam:transform:test:transform_to_update");
        as.setTransformsToOverride(of);
        as.setTransformServiceAddress("dummyaddress");
        RunnerApi.PTransform pTransform = (RunnerApi.PTransform) TransformUpgrader.of(new TestExpansionServiceClientFactory()).upgradeTransformsViaTransformService(proto, of, as).getComponents().getTransformsMap().get("TransformUpgraderTest-TestTransform");
        validateTestParam(pTransform, 4);
        Assert.assertTrue(pTransform.getAnnotationsMap().containsKey("upgraded_to_version"));
    }

    @Test
    public void testTransformUpgradeMultipleOccurrences() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply(Create.of(1, new Integer[]{2, 3})).apply(new TestTransform(2)).apply(ToString.elements()).apply(TextIO.write().to("dummyfilename"));
        create.apply(Create.of(1, new Integer[]{2, 3})).apply(new TestTransform(2)).apply(ToString.elements()).apply(TextIO.write().to("dummyfilename"));
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, false);
        ExternalTranslationOptions as = PipelineOptionsFactory.create().as(ExternalTranslationOptions.class);
        ImmutableList of = ImmutableList.of("beam:transform:test:transform_to_update");
        as.setTransformsToOverride(of);
        as.setTransformServiceAddress("dummyaddress");
        RunnerApi.Pipeline upgradeTransformsViaTransformService = TransformUpgrader.of(new TestExpansionServiceClientFactory()).upgradeTransformsViaTransformService(proto, of, as);
        validateTestParam((RunnerApi.PTransform) upgradeTransformsViaTransformService.getComponents().getTransformsMap().get("TransformUpgraderTest-TestTransform"), 4);
        validateTestParam((RunnerApi.PTransform) upgradeTransformsViaTransformService.getComponents().getTransformsMap().get("TransformUpgraderTest-TestTransform2"), 4);
    }

    @Test
    public void testTransformUpgradeMultipleURNs() throws Exception {
        Pipeline create = Pipeline.create();
        create.apply(Create.of(1, new Integer[]{2, 3})).apply(new TestTransform(2)).apply(ToString.elements()).apply(TextIO.write().to("dummyfilename"));
        create.apply(Create.of(1, new Integer[]{2, 3})).apply(new TestTransform2(2)).apply(ToString.elements()).apply(TextIO.write().to("dummyfilename"));
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create, false);
        ExternalTranslationOptions as = PipelineOptionsFactory.create().as(ExternalTranslationOptions.class);
        ImmutableList of = ImmutableList.of("beam:transform:test:transform_to_update", "beam:transform:test:transform_to_update2");
        as.setTransformsToOverride(of);
        as.setTransformServiceAddress("dummyaddress");
        RunnerApi.Pipeline upgradeTransformsViaTransformService = TransformUpgrader.of(new TestExpansionServiceClientFactory()).upgradeTransformsViaTransformService(proto, of, as);
        validateTestParam((RunnerApi.PTransform) upgradeTransformsViaTransformService.getComponents().getTransformsMap().get("TransformUpgraderTest-TestTransform"), 4);
        validateTestParam((RunnerApi.PTransform) upgradeTransformsViaTransformService.getComponents().getTransformsMap().get("TransformUpgraderTest-TestTransform2"), 4);
    }
}
