package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubMessageToRow;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
@AutoValue
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow.class */
public abstract class PubsubMessageToRow extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> implements Serializable {
    static final String TIMESTAMP_FIELD = "event_timestamp";
    static final String ATTRIBUTES_FIELD = "attributes";
    static final String PAYLOAD_FIELD = "payload";
    static final TupleTag<PubsubMessage> DLQ_TAG = new TupleTag<PubsubMessage>() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.1
    };
    static final TupleTag<Row> MAIN_TAG = new TupleTag<Row>() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.2
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue.Builder
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow$Builder.class */
    public static abstract class Builder {
        public abstract Builder messageSchema(Schema schema);

        public abstract Builder useDlq(boolean z);

        public abstract Builder useFlatSchema(boolean z);

        public abstract Builder serializerProvider(SerializerProvider serializerProvider);

        public abstract PubsubMessageToRow build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow$FlatSchemaPubsubMessageToRow.class */
    public static class FlatSchemaPubsubMessageToRow extends DoFn<PubsubMessage, Row> {
        private final Schema messageSchema;
        private final boolean useDlq;
        private final PayloadSerializer payloadSerializer;

        protected FlatSchemaPubsubMessageToRow(Schema schema, boolean z, SerializerProvider serializerProvider) {
            this.messageSchema = schema;
            Schema schema2 = new Schema((List) schema.getFields().stream().filter(field -> {
                return !field.getName().equals(PubsubMessageToRow.TIMESTAMP_FIELD);
            }).collect(Collectors.toList()));
            this.useDlq = z;
            this.payloadSerializer = (PayloadSerializer) serializerProvider.apply(schema2);
        }

        private Object getValueForFieldFlatSchema(Schema.Field field, Instant instant, Row row) {
            String name = field.getName();
            return PubsubMessageToRow.TIMESTAMP_FIELD.equals(name) ? instant : row.getValue(name);
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element PubsubMessage pubsubMessage, @DoFn.Timestamp Instant instant, DoFn.MultiOutputReceiver multiOutputReceiver) {
            try {
                Row deserialize = this.payloadSerializer.deserialize(pubsubMessage.getPayload());
                multiOutputReceiver.get(PubsubMessageToRow.MAIN_TAG).output(Row.withSchema(this.messageSchema).addValues((List) this.messageSchema.getFields().stream().map(field -> {
                    return getValueForFieldFlatSchema(field, instant, deserialize);
                }).collect(Collectors.toList())).build());
            } catch (Exception e) {
                if (!this.useDlq) {
                    throw new RuntimeException(e);
                }
                multiOutputReceiver.get(PubsubMessageToRow.DLQ_TAG).output(pubsubMessage);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow$NestedSchemaPubsubMessageToRow.class */
    public static class NestedSchemaPubsubMessageToRow extends DoFn<PubsubMessage, Row> {
        private final Schema messageSchema;
        private final boolean useDlq;

        @Nullable
        private final PayloadSerializer payloadSerializer;

        private NestedSchemaPubsubMessageToRow(Schema schema, boolean z, @Nullable SerializerProvider serializerProvider) {
            this.messageSchema = schema;
            this.useDlq = z;
            if (serializerProvider == null) {
                Preconditions.checkArgument(schema.getField(PubsubMessageToRow.PAYLOAD_FIELD).getType().getTypeName().equals(Schema.TypeName.BYTES));
                this.payloadSerializer = null;
            } else {
                Preconditions.checkArgument(schema.getField(PubsubMessageToRow.PAYLOAD_FIELD).getType().getTypeName().equals(Schema.TypeName.ROW));
                this.payloadSerializer = (PayloadSerializer) serializerProvider.apply(schema.getField(PubsubMessageToRow.PAYLOAD_FIELD).getType().getRowSchema());
            }
        }

        private Object maybeDeserialize(byte[] bArr) {
            return this.payloadSerializer == null ? bArr : this.payloadSerializer.deserialize(bArr);
        }

        private Object handleAttributes(Map<String, String> map) {
            if (this.messageSchema.getField(PubsubMessageToRow.ATTRIBUTES_FIELD).getType().getTypeName().isMapType()) {
                return map;
            }
            ImmutableList.Builder builder = ImmutableList.builder();
            map.forEach((str, str2) -> {
                builder.add(Row.withSchema(PubsubSchemaIOProvider.ATTRIBUTE_ARRAY_ENTRY_SCHEMA).attachValues(new Object[]{str, str2}));
            });
            return builder.build();
        }

        private Object getValueForFieldNestedSchema(Schema.Field field, Instant instant, Map<String, String> map, byte[] bArr) {
            String name = field.getName();
            boolean z = -1;
            switch (name.hashCode()) {
                case -786701938:
                    if (name.equals(PubsubMessageToRow.PAYLOAD_FIELD)) {
                        z = 2;
                        break;
                    }
                    break;
                case 405645655:
                    if (name.equals(PubsubMessageToRow.ATTRIBUTES_FIELD)) {
                        z = true;
                        break;
                    }
                    break;
                case 436051377:
                    if (name.equals(PubsubMessageToRow.TIMESTAMP_FIELD)) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return instant;
                case MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION /* 1 */:
                    return handleAttributes(map);
                case true:
                    return maybeDeserialize(bArr);
                default:
                    throw new IllegalArgumentException("Unexpected field '" + field.getName() + "' in top level schema for Pubsub message. Top level schema should only contain 'timestamp', 'attributes', and 'payload' fields");
            }
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element PubsubMessage pubsubMessage, @DoFn.Timestamp Instant instant, DoFn.MultiOutputReceiver multiOutputReceiver) {
            try {
                multiOutputReceiver.get(PubsubMessageToRow.MAIN_TAG).output(Row.withSchema(this.messageSchema).addValues((List) this.messageSchema.getFields().stream().map(field -> {
                    return getValueForFieldNestedSchema(field, instant, pubsubMessage.getAttributeMap(), pubsubMessage.getPayload());
                }).collect(Collectors.toList())).build());
            } catch (Exception e) {
                if (!this.useDlq) {
                    throw new RuntimeException(e);
                }
                multiOutputReceiver.get(PubsubMessageToRow.DLQ_TAG).output(pubsubMessage);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow$ParseException.class */
    public static class ParseException extends RuntimeException {
        ParseException(Throwable th) {
            super("Error parsing message", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow$SerializerProvider.class */
    public interface SerializerProvider extends SerializableFunction<Schema, PayloadSerializer> {
    }

    public abstract Schema messageSchema();

    public abstract boolean useDlq();

    public abstract boolean useFlatSchema();

    @Nullable
    public abstract SerializerProvider serializerProvider();

    public static Builder builder() {
        return new AutoValue_PubsubMessageToRow.Builder();
    }

    public PCollectionTuple expand(PCollection<PubsubMessage> pCollection) {
        PCollectionTuple apply = pCollection.apply(ParDo.of(useFlatSchema() ? new FlatSchemaPubsubMessageToRow(messageSchema(), useDlq(), serializerProvider()) : new NestedSchemaPubsubMessageToRow(messageSchema(), useDlq(), serializerProvider())).withOutputTags(MAIN_TAG, useDlq() ? TupleTagList.of(DLQ_TAG) : TupleTagList.empty()));
        apply.get(MAIN_TAG).setRowSchema(messageSchema());
        return apply;
    }
}
