package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({SchemaTransformProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.class */
public class PubsubLiteReadSchemaTransformProvider extends TypedSchemaTransformProvider<PubsubLiteReadSchemaTransformConfiguration> {
    private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteReadSchemaTransformProvider.class);
    public static final String VALID_FORMATS_STR = "AVRO,JSON";
    public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet(VALID_FORMATS_STR.split(","));
    public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.1
    };
    public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.2
    };
    public static final Schema ERROR_SCHEMA = Schema.builder().addStringField("error").addNullableByteArrayField("row").build();

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider$ErrorFn.class */
    public static class ErrorFn extends DoFn<SequencedMessage, Row> {
        private SerializableFunction<byte[], Row> valueMapper;
        private Counter errorCounter;
        private Long errorsInBundle = 0L;

        public ErrorFn(String str, SerializableFunction<byte[], Row> serializableFunction) {
            this.errorCounter = Metrics.counter(PubsubLiteReadSchemaTransformProvider.class, str);
            this.valueMapper = serializableFunction;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element SequencedMessage sequencedMessage, DoFn.MultiOutputReceiver multiOutputReceiver) {
            try {
                multiOutputReceiver.get(PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG).output((Row) this.valueMapper.apply(sequencedMessage.getMessage().getData().toByteArray()));
            } catch (Exception e) {
                this.errorsInBundle = Long.valueOf(this.errorsInBundle.longValue() + 1);
                PubsubLiteReadSchemaTransformProvider.LOG.warn("Error while parsing the element", e);
                multiOutputReceiver.get(PubsubLiteReadSchemaTransformProvider.ERROR_TAG).output(Row.withSchema(PubsubLiteReadSchemaTransformProvider.ERROR_SCHEMA).addValues(new Object[]{e.toString(), sequencedMessage.getMessage().getData().toByteArray()}).build());
            }
        }

        @DoFn.FinishBundle
        public void finish(DoFn<SequencedMessage, Row>.FinishBundleContext finishBundleContext) {
            this.errorCounter.inc(this.errorsInBundle.longValue());
            this.errorsInBundle = 0L;
        }
    }

    @DefaultSchema(AutoValueSchema.class)
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider$PubsubLiteReadSchemaTransformConfiguration.class */
    public static abstract class PubsubLiteReadSchemaTransformConfiguration {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider$PubsubLiteReadSchemaTransformConfiguration$Builder.class */
        public static abstract class Builder {
            public abstract Builder setFormat(String str);

            public abstract Builder setSchema(String str);

            public abstract Builder setProject(String str);

            public abstract Builder setSubscriptionName(String str);

            public abstract Builder setLocation(String str);

            public abstract PubsubLiteReadSchemaTransformConfiguration build();
        }

        @SchemaFieldDescription("The encoding format for the data stored in Pubsub Lite. Valid options are: AVRO,JSON")
        public abstract String getFormat();

        @SchemaFieldDescription("The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/).")
        public abstract String getSchema();

        @SchemaFieldDescription("The GCP project where the Pubsub Lite reservation resides. This can be a project number of a project ID.")
        public abstract String getProject();

        @SchemaFieldDescription("The name of the subscription to consume data. This will be concatenated with the project and location parameters to build a full subscription path.")
        public abstract String getSubscriptionName();

        @SchemaFieldDescription("The region or zone where the Pubsub Lite reservation resides.")
        public abstract String getLocation();

        public static Builder builder() {
            return new AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration.Builder();
        }
    }

    protected Class<PubsubLiteReadSchemaTransformConfiguration> configurationClass() {
        return PubsubLiteReadSchemaTransformConfiguration.class;
    }

    public SchemaTransform from(final PubsubLiteReadSchemaTransformConfiguration pubsubLiteReadSchemaTransformConfiguration) {
        if (!VALID_DATA_FORMATS.contains(pubsubLiteReadSchemaTransformConfiguration.getFormat())) {
            throw new IllegalArgumentException(String.format("Format %s not supported. Only supported formats are %s", pubsubLiteReadSchemaTransformConfiguration.getFormat(), VALID_FORMATS_STR));
        }
        final Schema beamSchemaFromJsonSchema = Objects.equals(pubsubLiteReadSchemaTransformConfiguration.getFormat(), "JSON") ? JsonUtils.beamSchemaFromJsonSchema(pubsubLiteReadSchemaTransformConfiguration.getSchema()) : AvroUtils.toBeamSchema(new Schema.Parser().parse(pubsubLiteReadSchemaTransformConfiguration.getSchema()));
        final SimpleFunction jsonBytesToRowFunction = Objects.equals(pubsubLiteReadSchemaTransformConfiguration.getFormat(), "JSON") ? JsonUtils.getJsonBytesToRowFunction(beamSchemaFromJsonSchema) : AvroUtils.getAvroBytesToRowFunction(beamSchemaFromJsonSchema);
        return new SchemaTransform() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.3
            public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
                return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.3.1
                    public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
                        String project = pubsubLiteReadSchemaTransformConfiguration.getProject();
                        if (Strings.isNullOrEmpty(project)) {
                            project = pCollectionRowTuple.getPipeline().getOptions().as(GcpOptions.class).getProject();
                        }
                        if (project == null) {
                            throw new IllegalArgumentException("Unable to infer the project to read from Pubsub Lite. Please provide a project.");
                        }
                        PCollectionTuple apply = pCollectionRowTuple.getPipeline().apply(PubsubLiteIO.read(SubscriberOptions.newBuilder().setSubscriptionPath(SubscriptionPath.newBuilder().setLocation(CloudRegionOrZone.parse(pubsubLiteReadSchemaTransformConfiguration.getLocation())).setProject(ProjectId.of(project)).setName(SubscriptionName.of(pubsubLiteReadSchemaTransformConfiguration.getSubscriptionName())).build()).build())).apply(ParDo.of(new ErrorFn("PubsubLite-read-error-counter", jsonBytesToRowFunction)).withOutputTags(PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG, TupleTagList.of(PubsubLiteReadSchemaTransformProvider.ERROR_TAG)));
                        return PCollectionRowTuple.of("output", apply.get(PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG).setRowSchema(beamSchemaFromJsonSchema), "errors", apply.get(PubsubLiteReadSchemaTransformProvider.ERROR_TAG).setRowSchema(PubsubLiteReadSchemaTransformProvider.ERROR_SCHEMA));
                    }
                };
            }
        };
    }

    public String identifier() {
        return "beam:schematransform:org.apache.beam:pubsublite_read:v1";
    }

    public List<String> inputCollectionNames() {
        return Collections.emptyList();
    }

    public List<String> outputCollectionNames() {
        return Arrays.asList("output", "errors");
    }
}
