package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.api.client.util.Clock;
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.Providers;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;

@Experimental(Experimental.Kind.SCHEMAS)
@Internal
@AutoService({SchemaTransformProvider.class})
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.class */
public class PubsubWriteSchemaTransformProvider extends TypedSchemaTransformProvider<PubsubWriteSchemaTransformConfiguration> {
    private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:pubsub_write:v1";
    static final String INPUT_TAG = "input";
    static final String ERROR_TAG = "error";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider$ConvertForRowToMessage.class */
    public static class ConvertForRowToMessage extends DoFn<Row, Row> {
        private final Schema targetSchema;

        @Nullable
        private final Clock clock;

        @Nullable
        private final String attributesFieldName;

        @Nullable
        private final String timestampFieldName;

        @Nullable
        private final String payloadFieldName;

        ConvertForRowToMessage(Schema schema, @Nullable Clock clock, @Nullable String str, @Nullable String str2, @Nullable String str3) {
            this.targetSchema = schema;
            this.clock = clock;
            this.attributesFieldName = str;
            this.timestampFieldName = str2;
            this.payloadFieldName = str3;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element Row row, DoFn.OutputReceiver<Row> outputReceiver) {
            Instant now = Instant.now();
            if (this.clock != null) {
                now = Instant.ofEpochMilli(this.clock.currentTimeMillis());
            }
            HashMap hashMap = new HashMap();
            Preconditions.checkState(this.targetSchema.hasField("$pubsub_attributes"));
            hashMap.put("$pubsub_attributes", ImmutableMap.of());
            Preconditions.checkState(this.targetSchema.hasField("$pubsub_event_timestamp"));
            hashMap.put("$pubsub_event_timestamp", now);
            for (String str : row.getSchema().getFieldNames()) {
                if (this.targetSchema.hasField(str)) {
                    hashMap.put(str, row.getValue(str));
                }
                if (this.attributesFieldName != null) {
                    hashMap.put("$pubsub_attributes", row.getValue(this.attributesFieldName));
                }
                if (this.timestampFieldName != null) {
                    hashMap.put("$pubsub_event_timestamp", row.getValue(this.timestampFieldName));
                }
                if (this.payloadFieldName != null) {
                    hashMap.put("$pubsub_payload", row.getValue(this.payloadFieldName));
                }
            }
            outputReceiver.output(Row.withSchema(this.targetSchema).withFieldValues(hashMap).build());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider$PubsubWriteSchemaTransform.class */
    public static class PubsubWriteSchemaTransform extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements SchemaTransform {
        private final PubsubWriteSchemaTransformConfiguration configuration;
        private PubsubClient.PubsubClientFactory pubsubClientFactory;

        PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration pubsubWriteSchemaTransformConfiguration) {
            this.configuration = pubsubWriteSchemaTransformConfiguration;
        }

        PubsubWriteSchemaTransform withPubsubClientFactory(PubsubClient.PubsubClientFactory pubsubClientFactory) {
            this.pubsubClientFactory = pubsubClientFactory;
            return this;
        }

        public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
            return this;
        }

        public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
            if (pCollectionRowTuple.getAll().size() != 1 || !pCollectionRowTuple.has(PubsubWriteSchemaTransformProvider.INPUT_TAG)) {
                throw new IllegalArgumentException(String.format("%s %s input is expected to contain a single %s tagged PCollection<Row>", pCollectionRowTuple.getClass().getSimpleName(), getClass().getSimpleName(), PubsubWriteSchemaTransformProvider.INPUT_TAG));
            }
            PCollection pCollection = pCollectionRowTuple.get(PubsubWriteSchemaTransformProvider.INPUT_TAG);
            if (pCollection.getSchema().getFieldCount() == 0) {
                throw new IllegalArgumentException(String.format("empty Schema for %s", PubsubWriteSchemaTransformProvider.INPUT_TAG));
            }
            Schema buildTargetSchema = buildTargetSchema(pCollection.getSchema());
            PCollection rowSchema = pCollection.apply(ConvertForRowToMessage.class.getSimpleName(), convertForRowToMessage(buildTargetSchema)).setRowSchema(buildTargetSchema);
            Schema removeFields = PubsubRowToMessage.removeFields(rowSchema.getSchema(), "$pubsub_attributes", "$pubsub_event_timestamp");
            if (PubsubRowToMessage.FieldMatcher.of("$pubsub_payload", Schema.TypeName.ROW).match(removeFields)) {
                removeFields = removeFields.getField("$pubsub_payload").getType().getRowSchema();
            }
            validateTargetSchemaAgainstPubsubSchema(removeFields, pCollectionRowTuple.getPipeline().getOptions());
            PCollectionTuple apply = rowSchema.apply(PubsubRowToMessage.class.getSimpleName(), buildPubsubRowToMessage(removeFields));
            apply.get(PubsubRowToMessage.OUTPUT).apply(PubsubIO.Write.class.getSimpleName(), buildPubsubWrite());
            return PCollectionRowTuple.of(PubsubWriteSchemaTransformProvider.ERROR_TAG, apply.get(PubsubRowToMessage.ERROR));
        }

        PayloadSerializer getPayloadSerializer(Schema schema) {
            if (this.configuration.getFormat() == null) {
                return null;
            }
            String format = this.configuration.getFormat();
            Set keySet = Providers.loadProviders(PayloadSerializerProvider.class).keySet();
            if (keySet.contains(format)) {
                return PayloadSerializers.getSerializer(this.configuration.getFormat(), schema, ImmutableMap.of());
            }
            throw new IllegalArgumentException(String.format("%s is not among the valid formats: [%s]", format, String.join(",", keySet)));
        }

        PubsubRowToMessage buildPubsubRowToMessage(Schema schema) {
            PubsubRowToMessage.Builder payloadSerializer = PubsubRowToMessage.builder().setPayloadSerializer(getPayloadSerializer(schema));
            if (this.configuration.getTarget() != null) {
                payloadSerializer = payloadSerializer.setTargetTimestampAttributeName(this.configuration.getTarget().getTimestampAttributeKey());
            }
            return payloadSerializer.build();
        }

        PubsubIO.Write<PubsubMessage> buildPubsubWrite() {
            PubsubIO.Write<PubsubMessage> write = PubsubIO.writeMessages().to(this.configuration.getTopic());
            if (this.configuration.getIdAttribute() != null) {
                write = write.withIdAttribute(this.configuration.getIdAttribute());
            }
            if (this.pubsubClientFactory != null) {
                write = write.withClientFactory(this.pubsubClientFactory);
            }
            return write;
        }

        void validateSourceSchemaAgainstConfiguration(Schema schema) {
            if (schema.getFieldCount() == 0) {
                throw new IllegalArgumentException(String.format("empty Schema for %s", PubsubWriteSchemaTransformProvider.INPUT_TAG));
            }
            if (this.configuration.getSource() == null) {
                return;
            }
            PubsubWriteSchemaTransformConfiguration.SourceConfiguration source = this.configuration.getSource();
            if (source.getAttributesFieldName() != null) {
                String attributesFieldName = source.getAttributesFieldName();
                Schema.FieldType fieldType = PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE;
                Preconditions.checkArgument(PubsubRowToMessage.FieldMatcher.of(attributesFieldName, fieldType).match(schema), String.format("schema missing field: %s for type %s: ", attributesFieldName, fieldType));
            }
            if (source.getTimestampFieldName() != null) {
                String timestampFieldName = source.getTimestampFieldName();
                Schema.FieldType fieldType2 = PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE;
                Preconditions.checkArgument(PubsubRowToMessage.FieldMatcher.of(timestampFieldName, fieldType2).match(schema), String.format("schema missing field: %s for type: %s", timestampFieldName, fieldType2));
            }
            if (source.getPayloadFieldName() == null) {
                return;
            }
            String payloadFieldName = source.getPayloadFieldName();
            Preconditions.checkArgument(PubsubRowToMessage.SchemaReflection.of(schema).matchesAny(PubsubRowToMessage.FieldMatcher.of(payloadFieldName, PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME), PubsubRowToMessage.FieldMatcher.of(payloadFieldName, PubsubRowToMessage.PAYLOAD_ROW_TYPE_NAME)), String.format("schema missing field: %s for types %s or %s", payloadFieldName, PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME, PubsubRowToMessage.PAYLOAD_ROW_TYPE_NAME));
            if (PubsubRowToMessage.removeFields(schema, (String[]) Stream.of((Object[]) new String[]{source.getAttributesFieldName(), source.getTimestampFieldName(), source.getPayloadFieldName()}).filter((v0) -> {
                return Objects.nonNull(v0);
            }).toArray(i -> {
                return new String[i];
            })).getFieldCount() > 0) {
                throw new IllegalArgumentException(String.format("user fields incompatible with %s field", source.getPayloadFieldName()));
            }
        }

        void validateTargetSchemaAgainstPubsubSchema(Schema schema, PipelineOptions pipelineOptions) {
            Preconditions.checkArgument(pipelineOptions != null);
            try {
                PubsubClient pubsubClient = getPubsubClient((PubsubOptions) pipelineOptions.as(PubsubOptions.class));
                Throwable th = null;
                try {
                    try {
                        PubsubClient.SchemaPath schemaPath = pubsubClient.getSchemaPath(PubsubClient.topicPathFromPath(this.configuration.getTopic()));
                        if (schemaPath != null && !schemaPath.equals(PubsubClient.SchemaPath.DELETED_SCHEMA)) {
                            Schema schema2 = pubsubClient.getSchema(schemaPath);
                            Preconditions.checkState(schema.equals(schema2), String.format("input schema mismatch with expected schema at path: %s\ninput schema: %s\nPub/Sub schema: %s", schemaPath, schema, schema2));
                            if (pubsubClient != null) {
                                if (0 != 0) {
                                    try {
                                        pubsubClient.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    pubsubClient.close();
                                }
                            }
                            return;
                        }
                        if (pubsubClient != null) {
                            if (0 == 0) {
                                pubsubClient.close();
                                return;
                            }
                            try {
                                pubsubClient.close();
                                return;
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                                return;
                            }
                        }
                        return;
                    } catch (Throwable th4) {
                        th = th4;
                        throw th4;
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new IllegalStateException(e.getMessage());
            }
            throw new IllegalStateException(e.getMessage());
        }

        Schema buildTargetSchema(Schema schema) {
            validateSourceSchemaAgainstConfiguration(schema);
            Schema.FieldType fieldType = null;
            ArrayList arrayList = new ArrayList();
            if (this.configuration.getSource() != null) {
                PubsubWriteSchemaTransformConfiguration.SourceConfiguration source = this.configuration.getSource();
                if (source.getAttributesFieldName() != null) {
                    arrayList.add(source.getAttributesFieldName());
                }
                if (source.getTimestampFieldName() != null) {
                    arrayList.add(source.getTimestampFieldName());
                }
                if (source.getPayloadFieldName() != null) {
                    String payloadFieldName = source.getPayloadFieldName();
                    fieldType = schema.getField(payloadFieldName).getType();
                    arrayList.add(payloadFieldName);
                }
            }
            return PubsubRowToMessage.removeFields(PubsubRowToMessage.builder().build().inputSchemaFactory(fieldType).buildSchema((Schema.Field[]) schema.getFields().toArray(new Schema.Field[0])), (String[]) arrayList.toArray(new String[0]));
        }

        private PubsubClient.PubsubClientFactory getPubsubClientFactory() {
            return this.pubsubClientFactory != null ? this.pubsubClientFactory : PubsubGrpcClient.FACTORY;
        }

        private PubsubClient getPubsubClient(PubsubOptions pubsubOptions) throws IOException {
            return getPubsubClientFactory().newClient(this.configuration.getTarget().getTimestampAttributeKey(), this.configuration.getIdAttribute(), pubsubOptions);
        }

        ParDo.SingleOutput<Row, Row> convertForRowToMessage(Schema schema) {
            return convertForRowToMessage(schema, null);
        }

        ParDo.SingleOutput<Row, Row> convertForRowToMessage(Schema schema, @Nullable Clock clock) {
            String str = null;
            String str2 = null;
            String str3 = null;
            PubsubWriteSchemaTransformConfiguration.SourceConfiguration source = this.configuration.getSource();
            if (source != null) {
                str = source.getAttributesFieldName();
                str2 = source.getTimestampFieldName();
                str3 = source.getPayloadFieldName();
            }
            return ParDo.of(new ConvertForRowToMessage(schema, clock, str, str2, str3));
        }
    }

    protected Class<PubsubWriteSchemaTransformConfiguration> configurationClass() {
        return PubsubWriteSchemaTransformConfiguration.class;
    }

    public SchemaTransform from(PubsubWriteSchemaTransformConfiguration pubsubWriteSchemaTransformConfiguration) {
        return new PubsubWriteSchemaTransform(pubsubWriteSchemaTransformConfiguration);
    }

    public String identifier() {
        return IDENTIFIER;
    }

    public List<String> inputCollectionNames() {
        return Collections.singletonList(INPUT_TAG);
    }

    public List<String> outputCollectionNames() {
        return Collections.singletonList(ERROR_TAG);
    }
}
