package org.apache.beam.sdk.io.gcp.pubsub;

import java.io.IOException;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.text.MatchesPattern;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.powermock.reflect.Whitebox;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.class */
public class PubsubIOExternalTest {

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest$TestStreamObserver.class */
    private static class TestStreamObserver<T> implements StreamObserver<T> {
        private T result;

        private TestStreamObserver() {
        }

        public void onNext(T t) {
            this.result = t;
        }

        public void onError(Throwable th) {
            throw new RuntimeException("Should not happen", th);
        }

        public void onCompleted() {
        }
    }

    @Test
    public void testConstructPubsubRead() throws Exception {
        ExpansionApi.ExpansionRequest build = ExpansionApi.ExpansionRequest.newBuilder().setComponents(RunnerApi.Components.getDefaultInstance()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:org.apache.beam:pubsub_read:v1").setPayload(encodeRow(Row.withSchema(Schema.of(new Schema.Field[]{Schema.Field.of("topic", Schema.FieldType.STRING), Schema.Field.of("id_label", Schema.FieldType.STRING), Schema.Field.of("with_attributes", Schema.FieldType.BOOLEAN)})).withFieldValue("topic", "projects/project-1234/topics/topic_name").withFieldValue("id_label", "id_foo").withFieldValue("with_attributes", true).build()).toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver testStreamObserver = new TestStreamObserver();
        expansionService.expand(build, testStreamObserver);
        RunnerApi.PTransform transform = ((ExpansionApi.ExpansionResponse) testStreamObserver.result).getTransform();
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*PubsubUnboundedSource.*")));
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*MapElements.*")));
        MatcherAssert.assertThat(Integer.valueOf(transform.getInputsCount()), Matchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(transform.getOutputsCount()), Matchers.is(1));
    }

    @Test
    public void testConstructPubsubWrite() throws Exception {
        ExternalTransforms.ExternalConfigurationPayload encodeRow = encodeRow(Row.withSchema(Schema.of(new Schema.Field[]{Schema.Field.of("topic", Schema.FieldType.STRING), Schema.Field.of("id_label", Schema.FieldType.STRING)})).withFieldValue("topic", "projects/project-1234/topics/topic_name").withFieldValue("id_label", "id_foo").build());
        Pipeline create = Pipeline.create();
        create.apply("unbounded", Impulse.create()).setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(create);
        ExpansionApi.ExpansionRequest build = ExpansionApi.ExpansionRequest.newBuilder().setComponents(proto.getComponents()).setTransform(RunnerApi.PTransform.newBuilder().setUniqueName("test").putInputs("input", (String) Iterables.getOnlyElement(((RunnerApi.PTransform) Iterables.getLast(proto.getComponents().getTransformsMap().values())).getOutputsMap().values())).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn("beam:transform:org.apache.beam:pubsub_write:v1").setPayload(encodeRow.toByteString()))).setNamespace("test_namespace").build();
        ExpansionService expansionService = new ExpansionService();
        TestStreamObserver testStreamObserver = new TestStreamObserver();
        expansionService.expand(build, testStreamObserver);
        ExpansionApi.ExpansionResponse expansionResponse = (ExpansionApi.ExpansionResponse) testStreamObserver.result;
        RunnerApi.PTransform transform = expansionResponse.getTransform();
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*MapElements.*")));
        MatcherAssert.assertThat(transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*PubsubUnboundedSink.*")));
        MatcherAssert.assertThat(Integer.valueOf(transform.getInputsCount()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(transform.getOutputsCount()), Matchers.is(0));
        DoFn doFn = ParDoTranslation.getDoFn(RunnerApi.ParDoPayload.parseFrom(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(expansionResponse.getComponents().getTransformsOrThrow(transform.getSubtransforms(1)).getSubtransforms(1)).getSubtransforms(3)).getSubtransforms(0)).getSpec().getPayload()));
        String str = (String) Whitebox.getInternalState(doFn, "idAttribute");
        ValueProvider valueProvider = (ValueProvider) Whitebox.getInternalState(doFn, "topic");
        MatcherAssert.assertThat(valueProvider == null ? null : String.valueOf(valueProvider), Matchers.is("projects/project-1234/topics/topic_name"));
        MatcherAssert.assertThat(str, Matchers.is("id_foo"));
    }

    private static ExternalTransforms.ExternalConfigurationPayload encodeRow(Row row) {
        ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream();
        try {
            SchemaCoder.of(row.getSchema()).encode(row, byteStringOutputStream);
            return ExternalTransforms.ExternalConfigurationPayload.newBuilder().setSchema(SchemaTranslation.schemaToProto(row.getSchema(), true)).setPayload(byteStringOutputStream.toByteString()).build();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
