package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.Uuid;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({SchemaTransformProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider.class */
public class PubsubLiteReadSchemaTransformProvider extends TypedSchemaTransformProvider<PubsubLiteReadSchemaTransformConfiguration> {
    public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON";
    private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteReadSchemaTransformProvider.class);
    public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet("RAW,AVRO,JSON".split(","));
    public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.1
    };
    public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.2
    };

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider$ErrorFn.class */
    public static class ErrorFn extends DoFn<SequencedMessage, Row> {
        private final SerializableFunction<byte[], Row> valueMapper;
        private final Counter errorCounter;
        private Long errorsInBundle;
        private final boolean handleErrors;
        private final List<String> attributes;
        private final String attributeMap;
        private final Schema errorSchema;
        private final Schema attributeSchema;

        public ErrorFn(String str, SerializableFunction<byte[], Row> serializableFunction, Schema schema, boolean z) {
            this.errorsInBundle = 0L;
            this.errorCounter = Metrics.counter(PubsubLiteReadSchemaTransformProvider.class, str);
            this.valueMapper = serializableFunction;
            this.errorSchema = schema;
            this.handleErrors = z;
            this.attributes = new ArrayList();
            this.attributeMap = ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
            this.attributeSchema = Schema.builder().build();
        }

        public ErrorFn(String str, SerializableFunction<byte[], Row> serializableFunction, Schema schema, List<String> list, String str2, Schema schema2, boolean z) {
            this.errorsInBundle = 0L;
            this.errorCounter = Metrics.counter(PubsubLiteReadSchemaTransformProvider.class, str);
            this.valueMapper = serializableFunction;
            this.errorSchema = schema;
            this.handleErrors = z;
            this.attributes = list;
            this.attributeMap = str2;
            this.attributeSchema = schema2;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element SequencedMessage sequencedMessage, DoFn.MultiOutputReceiver multiOutputReceiver) {
            Row row = null;
            try {
                if (this.attributes.isEmpty() && this.attributeSchema.getFields().isEmpty() && this.attributeMap.isEmpty()) {
                    row = (Row) this.valueMapper.apply(sequencedMessage.getMessage().getData().toByteArray());
                } else {
                    PubSubMessage message = sequencedMessage.getMessage();
                    Row.Builder addValues = Row.withSchema(this.attributeSchema).addValues(((Row) this.valueMapper.apply(message.getData().toByteArray())).getValues());
                    HashMap hashMap = new HashMap();
                    message.getAttributesMap().forEach((str, attributeValues) -> {
                        if (this.attributes.contains(str)) {
                            Objects.requireNonNull(addValues);
                            PubsubLiteReadSchemaTransformProvider.processAttribute(attributeValues, (v1) -> {
                                r1.addValue(v1);
                            });
                        }
                        if (this.attributeMap.isEmpty()) {
                            return;
                        }
                        PubsubLiteReadSchemaTransformProvider.processAttribute(attributeValues, str -> {
                        });
                    });
                    if (!this.attributeMap.isEmpty() && !hashMap.isEmpty()) {
                        addValues.addValue(hashMap);
                    }
                    row = addValues.build();
                }
            } catch (Exception e) {
                if (!this.handleErrors) {
                    throw new RuntimeException(e);
                }
                this.errorsInBundle = Long.valueOf(this.errorsInBundle.longValue() + 1);
                PubsubLiteReadSchemaTransformProvider.LOG.warn("Error while parsing the element", e);
                multiOutputReceiver.get(PubsubLiteReadSchemaTransformProvider.ERROR_TAG).output(ErrorHandling.errorRecord(this.errorSchema, sequencedMessage.getMessage().getData().toByteArray(), e));
            }
            if (row != null) {
                multiOutputReceiver.get(PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG).output(row);
            }
        }

        @DoFn.FinishBundle
        public void finish(DoFn<SequencedMessage, Row>.FinishBundleContext finishBundleContext) {
            this.errorCounter.inc(this.errorsInBundle.longValue());
            this.errorsInBundle = 0L;
        }
    }

    @DefaultSchema(AutoValueSchema.class)
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider$PubsubLiteReadSchemaTransformConfiguration.class */
    public static abstract class PubsubLiteReadSchemaTransformConfiguration {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteReadSchemaTransformProvider$PubsubLiteReadSchemaTransformConfiguration$Builder.class */
        public static abstract class Builder {
            public abstract Builder setFormat(String str);

            public abstract Builder setSchema(String str);

            public abstract Builder setProject(String str);

            public abstract Builder setSubscriptionName(String str);

            public abstract Builder setLocation(String str);

            public abstract Builder setErrorHandling(ErrorHandling errorHandling);

            public abstract Builder setAttributes(List<String> list);

            public abstract Builder setAttributeMap(String str);

            public abstract Builder setAttributeId(String str);

            public abstract PubsubLiteReadSchemaTransformConfiguration build();
        }

        @SchemaFieldDescription("The encoding format for the data stored in Pubsub Lite. Valid options are: RAW,AVRO,JSON")
        public abstract String getFormat();

        @SchemaFieldDescription("The schema in which the data is encoded in the Kafka topic. For AVRO data, this is a schema defined with AVRO schema syntax (https://avro.apache.org/docs/1.10.2/spec.html#schemas). For JSON data, this is a schema defined with JSON-schema syntax (https://json-schema.org/).")
        public abstract String getSchema();

        @SchemaFieldDescription("The GCP project where the Pubsub Lite reservation resides. This can be a project number of a project ID.")
        public abstract String getProject();

        @SchemaFieldDescription("The name of the subscription to consume data. This will be concatenated with the project and location parameters to build a full subscription path.")
        public abstract String getSubscriptionName();

        @SchemaFieldDescription("The region or zone where the Pubsub Lite reservation resides.")
        public abstract String getLocation();

        @SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
        public abstract ErrorHandling getErrorHandling();

        @SchemaFieldDescription("List of attribute keys whose values will be flattened into the output message as additional fields.  For example, if the format is `RAW` and attributes is `[\"a\", \"b\"]` then this read will produce elements of the form `Row(payload=..., a=..., b=...)`")
        public abstract List<String> getAttributes();

        @SchemaFieldDescription("Name of a field in which to store the full set of attributes associated with this message.  For example, if the format is `RAW` and `attribute_map` is set to `\"attrs\"` then this read will produce elements of the form `Row(payload=..., attrs=...)` where `attrs` is a Map type of string to string. If both `attributes` and `attribute_map` are set, the overlapping attribute values will be present in both the flattened structure and the attribute map.")
        public abstract String getAttributeMap();

        @SchemaFieldDescription("The attribute on incoming Pubsub Lite messages to use as a unique record identifier. When specified, the value of this attribute (which can be any string that uniquely identifies the record) will be used for deduplication of messages. If not provided, we cannot guarantee that no duplicate data will be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be strictly best effort.")
        public abstract String getAttributeId();

        public static Builder builder() {
            return new AutoValue_PubsubLiteReadSchemaTransformProvider_PubsubLiteReadSchemaTransformConfiguration.Builder();
        }
    }

    protected Class<PubsubLiteReadSchemaTransformConfiguration> configurationClass() {
        return PubsubLiteReadSchemaTransformConfiguration.class;
    }

    public SchemaTransform from(final PubsubLiteReadSchemaTransformConfiguration pubsubLiteReadSchemaTransformConfiguration) {
        Schema beamSchemaFromJsonSchema;
        SerializableFunction<byte[], Row> jsonBytesToRowFunction;
        if (!VALID_DATA_FORMATS.contains(pubsubLiteReadSchemaTransformConfiguration.getFormat())) {
            throw new IllegalArgumentException(String.format("Format %s not supported. Only supported formats are %s", pubsubLiteReadSchemaTransformConfiguration.getFormat(), "RAW,AVRO,JSON"));
        }
        final boolean hasOutput = ErrorHandling.hasOutput(pubsubLiteReadSchemaTransformConfiguration.getErrorHandling());
        String format = pubsubLiteReadSchemaTransformConfiguration.getFormat();
        String schema = pubsubLiteReadSchemaTransformConfiguration.getSchema();
        final List<String> attributes = pubsubLiteReadSchemaTransformConfiguration.getAttributes();
        if (format == null || !format.equals("RAW")) {
            if (schema == null) {
                throw new IllegalArgumentException("To read from PubSubLite in JSON or AVRO format, you must provide a schema.");
            }
            beamSchemaFromJsonSchema = Objects.equals(pubsubLiteReadSchemaTransformConfiguration.getFormat(), "JSON") ? JsonUtils.beamSchemaFromJsonSchema(schema) : AvroUtils.toBeamSchema(new Schema.Parser().parse(schema));
            jsonBytesToRowFunction = Objects.equals(pubsubLiteReadSchemaTransformConfiguration.getFormat(), "JSON") ? JsonUtils.getJsonBytesToRowFunction(beamSchemaFromJsonSchema) : AvroUtils.getAvroBytesToRowFunction(beamSchemaFromJsonSchema);
        } else {
            if (schema != null) {
                throw new IllegalArgumentException("To read from PubSubLite in RAW format, you can't provide a schema.");
            }
            beamSchemaFromJsonSchema = org.apache.beam.sdk.schemas.Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
            jsonBytesToRowFunction = getRawBytesToRowFunction(beamSchemaFromJsonSchema);
        }
        final org.apache.beam.sdk.schemas.Schema schema2 = beamSchemaFromJsonSchema;
        final SerializableFunction<byte[], Row> serializableFunction = jsonBytesToRowFunction;
        return new SchemaTransform() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.3
            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r0v42, types: [java.util.List] */
            public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
                String project = pubsubLiteReadSchemaTransformConfiguration.getProject();
                if (Strings.isNullOrEmpty(project)) {
                    project = pCollectionRowTuple.getPipeline().getOptions().as(GcpOptions.class).getProject();
                }
                if (project == null) {
                    throw new IllegalArgumentException("Unable to infer the project to read from Pubsub Lite. Please provide a project.");
                }
                org.apache.beam.sdk.schemas.Schema errorSchemaBytes = ErrorHandling.errorSchemaBytes();
                ArrayList arrayList = new ArrayList();
                if (attributes != null) {
                    arrayList = attributes;
                }
                String attributeMap = pubsubLiteReadSchemaTransformConfiguration.getAttributeMap();
                String str = attributeMap == null ? ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME : attributeMap;
                org.apache.beam.sdk.schemas.Schema buildSchemaWithAttributes = PubsubLiteReadSchemaTransformProvider.buildSchemaWithAttributes(schema2, arrayList, str);
                PCollection apply = pCollectionRowTuple.getPipeline().apply(PubsubLiteIO.read(SubscriberOptions.newBuilder().setSubscriptionPath(SubscriptionPath.newBuilder().setLocation(CloudRegionOrZone.parse(pubsubLiteReadSchemaTransformConfiguration.getLocation())).setProject(ProjectId.of(project)).setName(SubscriptionName.of(pubsubLiteReadSchemaTransformConfiguration.getSubscriptionName())).build()).build()));
                String attributeId = pubsubLiteReadSchemaTransformConfiguration.getAttributeId();
                PCollectionTuple apply2 = ((attributeId == null || attributeId.isEmpty()) ? apply : (PCollection) apply.apply(PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().setUuidExtractor(PubsubLiteReadSchemaTransformProvider.getUuidFromMessage(attributeId)).build()))).apply(ParDo.of(new ErrorFn("PubsubLite-read-error-counter", serializableFunction, errorSchemaBytes, arrayList, str, buildSchemaWithAttributes, hasOutput)).withOutputTags(PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG, TupleTagList.of(PubsubLiteReadSchemaTransformProvider.ERROR_TAG)));
                return PCollectionRowTuple.of("output", apply2.get(PubsubLiteReadSchemaTransformProvider.OUTPUT_TAG).setRowSchema(buildSchemaWithAttributes), "errors", apply2.get(PubsubLiteReadSchemaTransformProvider.ERROR_TAG).setRowSchema(errorSchemaBytes));
            }
        };
    }

    public static org.apache.beam.sdk.schemas.Schema buildSchemaWithAttributes(org.apache.beam.sdk.schemas.Schema schema, List<String> list, String str) {
        Schema.Builder builder = org.apache.beam.sdk.schemas.Schema.builder();
        schema.getFields().forEach(field -> {
            builder.addField(field.getName(), field.getType());
        });
        list.forEach(str2 -> {
            if (str2 == null || str2.isEmpty()) {
                throw new IllegalArgumentException("Attribute names in the attributes list must not be null or empty.");
            }
            builder.addField(str2, Schema.FieldType.STRING);
        });
        if (!str.isEmpty()) {
            builder.addMapField(str, Schema.FieldType.STRING, Schema.FieldType.STRING).build();
        }
        return builder.build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void processAttribute(AttributeValues attributeValues, Consumer<String> consumer) {
        if (attributeValues != null) {
            List valuesList = attributeValues.getValuesList();
            if (valuesList.size() != 1) {
                throw new RuntimeException("Received an unparseable message with multiple values for an attribute.");
            }
            consumer.accept(((ByteString) valuesList.get(0)).toStringUtf8());
        }
    }

    public static SerializableFunction<byte[], Row> getRawBytesToRowFunction(final org.apache.beam.sdk.schemas.Schema schema) {
        return new SimpleFunction<byte[], Row>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.4
            public Row apply(byte[] bArr) {
                return Row.withSchema(schema).addValue(bArr).build();
            }
        };
    }

    public static SerializableFunction<SequencedMessage, Uuid> getUuidFromMessage(final String str) {
        return new SimpleFunction<SequencedMessage, Uuid>() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteReadSchemaTransformProvider.5
            public Uuid apply(SequencedMessage sequencedMessage) {
                AttributeValues attributeValues = (AttributeValues) sequencedMessage.getMessage().getAttributesMap().get(str);
                if (attributeValues == null) {
                    throw new RuntimeException("Uuid attribute missing.");
                }
                if (attributeValues.getValuesCount() != 1) {
                    throw new RuntimeException("Received an unparseable message with multiple values for an attribute.");
                }
                return Uuid.of(attributeValues.getValues(0));
            }
        };
    }

    public String identifier() {
        return "beam:schematransform:org.apache.beam:pubsublite_read:v1";
    }

    public List<String> inputCollectionNames() {
        return Collections.emptyList();
    }

    public List<String> outputCollectionNames() {
        return Arrays.asList("output", "errors");
    }
}
