package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubRowToMessage;
import org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubRowToMessage_InputSchemaFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.ReadableDateTime;
import org.joda.time.format.ISODateTimeFormat;

/* JADX INFO: Access modifiers changed from: package-private */
@Experimental
@Internal
@AutoValue
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.class */
public abstract class PubsubRowToMessage extends PTransform<PCollection<Row>, PCollectionTuple> {
    static final String ERROR_DATA_FIELD_NAME = "data";
    static final String DEFAULT_KEY_PREFIX = "$";
    static final String ATTRIBUTES_KEY_NAME = "pubsub_attributes";
    static final String EVENT_TIMESTAMP_KEY_NAME = "pubsub_event_timestamp";
    static final String PAYLOAD_KEY_NAME = "pubsub_payload";
    static final String DEFAULT_ATTRIBUTES_KEY_NAME = "$pubsub_attributes";
    static final String DEFAULT_EVENT_TIMESTAMP_KEY_NAME = "$pubsub_event_timestamp";
    static final String DEFAULT_PAYLOAD_KEY_NAME = "$pubsub_payload";
    static final TupleTag<PubsubMessage> OUTPUT = new TupleTag<PubsubMessage>() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.1
    };
    static final TupleTag<Row> ERROR = new TupleTag<Row>() { // from class: org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.2
    };
    static final Schema.Field ERROR_MESSAGE_FIELD = Schema.Field.of("error_message", Schema.FieldType.STRING);
    static final Schema.Field ERROR_STACK_TRACE_FIELD = Schema.Field.of("error_stack_trace", Schema.FieldType.STRING);
    static final Schema.FieldType ATTRIBUTES_FIELD_TYPE = Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING);
    static final Schema.FieldType EVENT_TIMESTAMP_FIELD_TYPE = Schema.FieldType.DATETIME;
    static final Schema.TypeName PAYLOAD_BYTES_TYPE_NAME = Schema.TypeName.BYTES;
    static final Schema.TypeName PAYLOAD_ROW_TYPE_NAME = Schema.TypeName.ROW;

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue.Builder
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage$Builder.class */
    public static abstract class Builder {
        abstract Builder setKeyPrefix(String str);

        abstract Optional<String> getKeyPrefix();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Builder setPayloadSerializer(PayloadSerializer payloadSerializer);

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Builder setTargetTimestampAttributeName(String str);

        abstract Builder setMockInstant(Instant instant);

        abstract PubsubRowToMessage autoBuild();

        /* JADX INFO: Access modifiers changed from: package-private */
        public final PubsubRowToMessage build() {
            if (!getKeyPrefix().isPresent()) {
                setKeyPrefix(PubsubRowToMessage.DEFAULT_KEY_PREFIX);
            }
            return autoBuild();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage$FieldMatcher.class */
    public static class FieldMatcher {
        private final String name;

        @Nullable
        private final Schema.TypeName typeName;

        @Nullable
        private final Schema.FieldType fieldType;

        static FieldMatcher of(String str) {
            return new FieldMatcher(str);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static FieldMatcher of(String str, Schema.TypeName typeName) {
            return new FieldMatcher(str, typeName);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static FieldMatcher of(String str, Schema.FieldType fieldType) {
            return new FieldMatcher(str, fieldType);
        }

        private FieldMatcher(String str, @Nullable Schema.TypeName typeName, @Nullable Schema.FieldType fieldType) {
            this.name = str;
            this.typeName = typeName;
            this.fieldType = fieldType;
        }

        private FieldMatcher(String str) {
            this(str, null, null);
        }

        private FieldMatcher(String str, Schema.TypeName typeName) {
            this(str, typeName, null);
        }

        private FieldMatcher(String str, Schema.FieldType fieldType) {
            this(str, null, fieldType);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean match(Schema schema) {
            if (!schema.hasField(this.name)) {
                return false;
            }
            if (this.typeName == null && this.fieldType == null) {
                return true;
            }
            Schema.Field field = schema.getField(this.name);
            return this.typeName != null ? field.getType().getTypeName().equals(this.typeName) : this.fieldType.equals(field.getType());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage$InputSchemaFactory.class */
    public static abstract class InputSchemaFactory {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage$InputSchemaFactory$Builder.class */
        public static abstract class Builder {
            abstract Builder setAttributesField(Schema.Field field);

            abstract Builder setTimestampField(Schema.Field field);

            abstract Builder setPayloadField(Schema.Field field);

            abstract InputSchemaFactory build();
        }

        static Builder builder() {
            return new AutoValue_PubsubRowToMessage_InputSchemaFactory.Builder();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Schema.Field getAttributesField();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Schema.Field getTimestampField();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract Schema.Field getPayloadField();

        /* JADX INFO: Access modifiers changed from: package-private */
        public Schema buildSchema(Schema.Field... fieldArr) {
            Schema.Builder addField = Schema.builder().addField(getAttributesField()).addField(getTimestampField());
            if (getPayloadField() != null) {
                addField = addField.addField(getPayloadField());
            }
            for (Schema.Field field : fieldArr) {
                addField = addField.addField(field);
            }
            return addField.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage$PubsubRowToMessageDoFn.class */
    public static class PubsubRowToMessageDoFn extends DoFn<Row, PubsubMessage> {
        private final String attributesKeyName;
        private final String sourceTimestampKeyName;
        private final String payloadKeyName;
        private final Schema errorSchema;

        @Nullable
        private final String targetTimestampKeyName;

        @Nullable
        private final PayloadSerializer payloadSerializer;

        @Nullable
        private final Instant mockInstant;

        PubsubRowToMessageDoFn(String str, String str2, String str3, Schema schema, @Nullable String str4, @Nullable Instant instant, @Nullable PayloadSerializer payloadSerializer) {
            this.attributesKeyName = str;
            this.sourceTimestampKeyName = str2;
            this.payloadKeyName = str3;
            this.errorSchema = schema;
            this.targetTimestampKeyName = str4;
            this.payloadSerializer = payloadSerializer;
            this.mockInstant = instant;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element Row row, DoFn.MultiOutputReceiver multiOutputReceiver) {
            try {
                Map<String, String> attributesWithoutTimestamp = attributesWithoutTimestamp(row);
                String timestampAsString = timestampAsString(row);
                String str = this.sourceTimestampKeyName;
                if (this.targetTimestampKeyName != null) {
                    str = this.targetTimestampKeyName;
                }
                byte[] payload = payload(row);
                HashMap hashMap = new HashMap(attributesWithoutTimestamp);
                hashMap.put(str, timestampAsString);
                multiOutputReceiver.get(PubsubRowToMessage.OUTPUT).output(new PubsubMessage(payload, hashMap));
            } catch (Exception e) {
                String message = e.getMessage();
                multiOutputReceiver.get(PubsubRowToMessage.ERROR).output(Row.withSchema(this.errorSchema).withFieldValue(PubsubRowToMessage.ERROR_DATA_FIELD_NAME, row).withFieldValue(PubsubRowToMessage.ERROR_MESSAGE_FIELD.getName(), message).withFieldValue(PubsubRowToMessage.ERROR_STACK_TRACE_FIELD.getName(), Throwables.getStackTraceAsString(e)).build());
            }
        }

        Map<String, String> attributesWithoutTimestamp(Row row) {
            return !row.getSchema().hasField(this.attributesKeyName) ? new HashMap() : row.getMap(this.attributesKeyName);
        }

        String timestampAsString(Row row) {
            return ISODateTimeFormat.dateTime().print(timestamp(row));
        }

        ReadableDateTime timestamp(Row row) {
            if (row.getSchema().hasField(this.sourceTimestampKeyName)) {
                return row.getDateTime(this.sourceTimestampKeyName);
            }
            Instant now = Instant.now();
            if (this.mockInstant != null) {
                now = this.mockInstant;
            }
            return new DateTime(now).withZone(now.getZone());
        }

        byte[] payload(Row row) {
            return SchemaReflection.of(row.getSchema()).matchesAll(FieldMatcher.of(this.payloadKeyName, PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME)) ? row.getBytes(this.payloadKeyName) : ((PayloadSerializer) Objects.requireNonNull(this.payloadSerializer)).serialize(serializableRow(row));
        }

        Row serializableRow(Row row) {
            SchemaReflection of = SchemaReflection.of(row.getSchema());
            if (of.matchesAll(FieldMatcher.of(this.payloadKeyName, PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME))) {
                throw new IllegalArgumentException(String.format("serializable Row does not exist for payload of type: %s", PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME));
            }
            if (of.matchesAll(FieldMatcher.of(this.payloadKeyName, PubsubRowToMessage.PAYLOAD_ROW_TYPE_NAME))) {
                return row.getRow(this.payloadKeyName);
            }
            Schema removeFields = PubsubRowToMessage.removeFields(row.getSchema(), this.attributesKeyName, this.sourceTimestampKeyName);
            HashMap hashMap = new HashMap();
            for (String str : removeFields.getFieldNames()) {
                hashMap.put(str, row.getValue(str));
            }
            return Row.withSchema(removeFields).withFieldValues(hashMap).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage$SchemaReflection.class */
    public static class SchemaReflection {
        private final Schema schema;

        /* JADX INFO: Access modifiers changed from: package-private */
        public static SchemaReflection of(Schema schema) {
            return new SchemaReflection(schema);
        }

        private SchemaReflection(Schema schema) {
            this.schema = schema;
        }

        boolean matchesAll(FieldMatcher... fieldMatcherArr) {
            for (FieldMatcher fieldMatcher : fieldMatcherArr) {
                if (!fieldMatcher.match(this.schema)) {
                    return false;
                }
            }
            return true;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean matchesAny(FieldMatcher... fieldMatcherArr) {
            for (FieldMatcher fieldMatcher : fieldMatcherArr) {
                if (fieldMatcher.match(this.schema)) {
                    return true;
                }
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Builder builder() {
        return new AutoValue_PubsubRowToMessage.Builder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract String getKeyPrefix();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public abstract PayloadSerializer getPayloadSerializer();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public abstract String getTargetTimestampAttributeName();

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public abstract Instant getMockInstant();

    static Schema errorSchema(Schema schema) {
        return Schema.of(new Schema.Field[]{Schema.Field.of(ERROR_DATA_FIELD_NAME, Schema.FieldType.row(schema)), ERROR_MESSAGE_FIELD, ERROR_STACK_TRACE_FIELD});
    }

    InputSchemaFactory inputSchemaFactory() {
        return inputSchemaFactory(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InputSchemaFactory inputSchemaFactory(@Nullable Schema.FieldType fieldType) {
        InputSchemaFactory.Builder timestampField = InputSchemaFactory.builder().setAttributesField(Schema.Field.of(getAttributesKeyName(), ATTRIBUTES_FIELD_TYPE)).setTimestampField(Schema.Field.of(getSourceEventTimestampKeyName(), EVENT_TIMESTAMP_FIELD_TYPE));
        if (fieldType != null) {
            timestampField = timestampField.setPayloadField(Schema.Field.of(getPayloadKeyName(), fieldType));
        }
        return timestampField.build();
    }

    public PCollectionTuple expand(PCollection<Row> pCollection) {
        Schema schema = pCollection.getSchema();
        validate(schema);
        Schema errorSchema = errorSchema(schema);
        PCollectionTuple apply = pCollection.apply(PubsubRowToMessage.class.getSimpleName(), ParDo.of(new PubsubRowToMessageDoFn(getAttributesKeyName(), getSourceEventTimestampKeyName(), getPayloadKeyName(), errorSchema, getTargetTimestampAttributeName(), getMockInstant(), getPayloadSerializer())).withOutputTags(OUTPUT, TupleTagList.of(ERROR)));
        PCollection pCollection2 = apply.get(OUTPUT);
        return PCollectionTuple.of(OUTPUT, pCollection2).and(ERROR, apply.get(ERROR).setRowSchema(errorSchema));
    }

    String getAttributesKeyName() {
        return getKeyPrefix() + ATTRIBUTES_KEY_NAME;
    }

    String getSourceEventTimestampKeyName() {
        return getKeyPrefix() + EVENT_TIMESTAMP_KEY_NAME;
    }

    String getPayloadKeyName() {
        return getKeyPrefix() + PAYLOAD_KEY_NAME;
    }

    void validate(Schema schema) {
        if (schema.getFieldCount() == 0) {
            throw new IllegalArgumentException(String.format("Schema must contain at least one field. Schema: %s", schema));
        }
        validateAttributesField(schema);
        validateSourceEventTimeStampField(schema);
        validateSerializableFields(schema);
    }

    void validateAttributesField(Schema schema) {
        String attributesKeyName = getAttributesKeyName();
        if (schema.hasField(attributesKeyName)) {
            Preconditions.checkArgument(SchemaReflection.of(schema).matchesAll(FieldMatcher.of(attributesKeyName, ATTRIBUTES_FIELD_TYPE)));
        }
    }

    void validateSourceEventTimeStampField(Schema schema) {
        String sourceEventTimestampKeyName = getSourceEventTimestampKeyName();
        if (schema.hasField(sourceEventTimestampKeyName)) {
            Preconditions.checkArgument(SchemaReflection.of(schema).matchesAll(FieldMatcher.of(sourceEventTimestampKeyName, EVENT_TIMESTAMP_FIELD_TYPE)));
        }
    }

    void validateSerializableFields(Schema schema) {
        String attributesKeyName = getAttributesKeyName();
        String sourceEventTimestampKeyName = getSourceEventTimestampKeyName();
        String payloadKeyName = getPayloadKeyName();
        Schema removeFields = removeFields(schema, attributesKeyName, sourceEventTimestampKeyName, payloadKeyName);
        boolean z = removeFields.getFieldCount() > 0;
        String join = String.join(", ", removeFields.getFieldNames());
        SchemaReflection of = SchemaReflection.of(schema);
        boolean matchesAll = of.matchesAll(FieldMatcher.of(payloadKeyName));
        boolean matchesAll2 = of.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_ROW_TYPE_NAME));
        boolean matchesAll3 = of.matchesAll(FieldMatcher.of(payloadKeyName, PAYLOAD_BYTES_TYPE_NAME));
        boolean z2 = z && matchesAll;
        Preconditions.checkArgument(z || matchesAll, String.format("schema must have either a %s field or user fields i.e. not %s, %s or %s", payloadKeyName, attributesKeyName, sourceEventTimestampKeyName, payloadKeyName));
        Preconditions.checkArgument(!z2, String.format("schema field: %s incompatible with %s fields", payloadKeyName, join));
        if (matchesAll3) {
            Preconditions.checkArgument(getPayloadSerializer() == null, String.format("schema field: %s of type: %s with a %s is incompatible", payloadKeyName, PAYLOAD_BYTES_TYPE_NAME, PayloadSerializer.class.getName()));
        }
        if (matchesAll2) {
            Preconditions.checkArgument(getPayloadSerializer() != null, String.format("schema field: %s of type: %s requires a %s", payloadKeyName, PAYLOAD_ROW_TYPE_NAME, PayloadSerializer.class.getName()));
        }
        if (z) {
            Preconditions.checkArgument(getPayloadSerializer() != null, String.format("specifying schema fields: %s requires a %s", join, PayloadSerializer.class.getName()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Schema removeFields(Schema schema, String... strArr) {
        List list = (List) Arrays.stream(strArr).collect(Collectors.toList());
        Schema.Builder builder = Schema.builder();
        for (Schema.Field field : schema.getFields()) {
            if (!list.contains(field.getName())) {
                builder.addField(field);
            }
        }
        return builder.build();
    }
}
