package org.apache.beam.sdk.io.gcp.spanner.changestreams;

import com.google.auto.value.AutoValue;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.Type;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
import org.apache.beam.sdk.io.gcp.spanner.ReadSpannerSchema;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerSchema;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.AutoValue_SpannerChangestreamsReadSchemaTransformProvider_SpannerChangestreamsReadConfiguration;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.gson.Gson;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.joda.time.DateTime;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider.class */
public class SpannerChangestreamsReadSchemaTransformProvider extends TypedSchemaTransformProvider<SpannerChangestreamsReadConfiguration> {
    private static final HashMap<String, SpannerSchema> TABLE_SCHEMAS = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.sdk.io.gcp.spanner.changestreams.SpannerChangestreamsReadSchemaTransformProvider$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName;
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$spanner$Type$Code = new int[Type.Code.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.BOOL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.BYTES.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.STRING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.INT64.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.NUMERIC.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.FLOAT64.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.TIMESTAMP.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.DATE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.ARRAY.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.JSON.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$Type$Code[Type.Code.STRUCT.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName = new int[Schema.TypeName.values().length];
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.STRING.ordinal()] = 1;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.INT64.ordinal()] = 2;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.INT16.ordinal()] = 3;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.INT32.ordinal()] = 4;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.FLOAT.ordinal()] = 5;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.DOUBLE.ordinal()] = 6;
            } catch (NoSuchFieldError e17) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.BOOLEAN.ordinal()] = 7;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.BYTES.ordinal()] = 8;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.DATETIME.ordinal()] = 9;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[Schema.TypeName.DECIMAL.ordinal()] = 10;
            } catch (NoSuchFieldError e21) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider$DataChangeRecordToRow.class */
    public static final class DataChangeRecordToRow extends DoFn<DataChangeRecord, Row> {
        private final Schema tableChangeRecordSchema;
        private final String tableName;
        private transient Gson gson = new Gson();

        DataChangeRecordToRow(String str, Schema schema) {
            this.tableName = str;
            this.tableChangeRecordSchema = schema;
        }

        public Gson getGson() {
            if (this.gson == null) {
                this.gson = new Gson();
            }
            return this.gson;
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element DataChangeRecord dataChangeRecord, DoFn.OutputReceiver<Row> outputReceiver) {
            if (dataChangeRecord.getTableName().equalsIgnoreCase(this.tableName)) {
                Instant instant = new Instant(dataChangeRecord.getRecordTimestamp().toSqlTimestamp());
                for (Mod mod : dataChangeRecord.getMods()) {
                    Schema rowSchema = this.tableChangeRecordSchema.getField("rowValues").getType().getRowSchema();
                    if (rowSchema == null) {
                        throw new RuntimeException("Row schema for internal row is null and cannot be utilized.");
                    }
                    Row.FieldValueBuilder fromRow = Row.fromRow(Row.nullRow(rowSchema));
                    Map map = (Map) Optional.ofNullable(mod.getNewValuesJson()).map(str -> {
                        return (Map) getGson().fromJson(str, Map.class);
                    }).orElseGet(Collections::emptyMap);
                    Map map2 = (Map) Optional.ofNullable(mod.getKeysJson()).map(str2 -> {
                        return (Map) getGson().fromJson(str2, Map.class);
                    }).orElseGet(Collections::emptyMap);
                    for (Map.Entry entry : map.entrySet()) {
                        if (entry.getValue() != null) {
                            fromRow = fromRow.withFieldValue(((String) entry.getKey()).toLowerCase(), SpannerChangestreamsReadSchemaTransformProvider.stringToParsedValue(rowSchema.getField(((String) entry.getKey()).toLowerCase()).getType(), (String) entry.getValue()));
                        }
                    }
                    for (Map.Entry entry2 : map2.entrySet()) {
                        if (entry2.getValue() != null) {
                            fromRow = fromRow.withFieldValue(((String) entry2.getKey()).toLowerCase(), SpannerChangestreamsReadSchemaTransformProvider.stringToParsedValue(rowSchema.getField(((String) entry2.getKey()).toLowerCase()).getType(), (String) entry2.getValue()));
                        }
                    }
                    outputReceiver.outputWithTimestamp(Row.withSchema(this.tableChangeRecordSchema).addValue(dataChangeRecord.getModType().toString()).addValue(dataChangeRecord.getCommitTimestamp().toString()).addValue(Long.valueOf(Long.parseLong(dataChangeRecord.getRecordSequence()))).addValue(fromRow.build()).build(), instant);
                }
            }
        }
    }

    @DefaultSchema(AutoValueSchema.class)
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider$SpannerChangestreamsReadConfiguration.class */
    public static abstract class SpannerChangestreamsReadConfiguration implements Serializable {

        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangestreamsReadSchemaTransformProvider$SpannerChangestreamsReadConfiguration$Builder.class */
        public static abstract class Builder {
            public abstract Builder setDatabaseId(String str);

            public abstract Builder setProjectId(String str);

            public abstract Builder setInstanceId(String str);

            public abstract Builder setTable(String str);

            public abstract Builder setStartAtTimestamp(String str);

            public abstract Builder setEndAtTimestamp(String str);

            public abstract Builder setChangeStreamName(String str);

            public abstract SpannerChangestreamsReadConfiguration build();
        }

        public abstract String getDatabaseId();

        public abstract String getProjectId();

        public abstract String getInstanceId();

        public abstract String getTable();

        public abstract String getStartAtTimestamp();

        public abstract String getEndAtTimestamp();

        public abstract String getChangeStreamName();

        public static Builder builder() {
            return new AutoValue_SpannerChangestreamsReadSchemaTransformProvider_SpannerChangestreamsReadConfiguration.Builder();
        }
    }

    protected Class<SpannerChangestreamsReadConfiguration> configurationClass() {
        return SpannerChangestreamsReadConfiguration.class;
    }

    public SchemaTransform from(final SpannerChangestreamsReadConfiguration spannerChangestreamsReadConfiguration) {
        return new SchemaTransform() { // from class: org.apache.beam.sdk.io.gcp.spanner.changestreams.SpannerChangestreamsReadSchemaTransformProvider.1
            public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
                return new PTransform<PCollectionRowTuple, PCollectionRowTuple>() { // from class: org.apache.beam.sdk.io.gcp.spanner.changestreams.SpannerChangestreamsReadSchemaTransformProvider.1.1
                    public PCollectionRowTuple expand(PCollectionRowTuple pCollectionRowTuple) {
                        Pipeline pipeline = pCollectionRowTuple.getPipeline();
                        Schema tableSchema = SpannerChangestreamsReadSchemaTransformProvider.getTableSchema(spannerChangestreamsReadConfiguration);
                        SpannerIO.ReadChangeStream withInstanceId = SpannerIO.readChangeStream().withSpannerConfig(SpannerConfig.create().withProjectId(spannerChangestreamsReadConfiguration.getProjectId()).withInstanceId(spannerChangestreamsReadConfiguration.getInstanceId()).withDatabaseId(spannerChangestreamsReadConfiguration.getDatabaseId())).withChangeStreamName(spannerChangestreamsReadConfiguration.getChangeStreamName()).withInclusiveStartAt(Timestamp.parseTimestamp(spannerChangestreamsReadConfiguration.getStartAtTimestamp())).withDatabaseId(spannerChangestreamsReadConfiguration.getDatabaseId()).withProjectId(spannerChangestreamsReadConfiguration.getProjectId()).withInstanceId(spannerChangestreamsReadConfiguration.getInstanceId());
                        if (spannerChangestreamsReadConfiguration.getEndAtTimestamp() != null) {
                            withInstanceId = withInstanceId.withInclusiveEndAt(Timestamp.parseTimestamp((String) Objects.requireNonNull((String) Objects.requireNonNull(spannerChangestreamsReadConfiguration.getEndAtTimestamp()))));
                        }
                        return PCollectionRowTuple.of("output", pipeline.apply(withInstanceId).apply(ParDo.of(new DataChangeRecordToRow(spannerChangestreamsReadConfiguration.getTable(), tableSchema))).setRowSchema(tableSchema));
                    }
                };
            }
        };
    }

    public String identifier() {
        return "beam:schematransform:org.apache.beam:spanner_cdc_read:v1";
    }

    public List<String> inputCollectionNames() {
        return Collections.emptyList();
    }

    public List<String> outputCollectionNames() {
        return Collections.singletonList("output");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Schema getTableSchema(final SpannerChangestreamsReadConfiguration spannerChangestreamsReadConfiguration) {
        Pipeline create = Pipeline.create();
        PCollectionView apply = create.apply("Create Dialect", Create.of(Dialect.GOOGLE_STANDARD_SQL, new Dialect[0])).apply("Dialect to View", View.asSingleton());
        create.apply(Create.of((Void) null, new Void[0])).apply(ParDo.of(new ReadSpannerSchema(SpannerConfig.create().withDatabaseId(spannerChangestreamsReadConfiguration.getDatabaseId()).withInstanceId(spannerChangestreamsReadConfiguration.getInstanceId()).withProjectId(spannerChangestreamsReadConfiguration.getProjectId()), apply, Sets.newHashSet(new String[]{spannerChangestreamsReadConfiguration.getTable()}))).withSideInput("dialect", apply)).apply(ParDo.of(new DoFn<SpannerSchema, String>() { // from class: org.apache.beam.sdk.io.gcp.spanner.changestreams.SpannerChangestreamsReadSchemaTransformProvider.2
            @DoFn.ProcessElement
            public void process(@DoFn.Element SpannerSchema spannerSchema) {
                SpannerChangestreamsReadSchemaTransformProvider.TABLE_SCHEMAS.put(SpannerChangestreamsReadConfiguration.this.getTable(), spannerSchema);
            }
        })).setCoder(StringUtf8Coder.of());
        create.run().waitUntilFinish();
        SpannerSchema remove = TABLE_SCHEMAS.remove(spannerChangestreamsReadConfiguration.getTable());
        if (remove == null) {
            throw new RuntimeException(String.format("Could not get schema for configuration %s", spannerChangestreamsReadConfiguration));
        }
        return spannerSchemaToBeamSchema(remove, spannerChangestreamsReadConfiguration.getTable());
    }

    private static Schema spannerSchemaToBeamSchema(SpannerSchema spannerSchema, String str) {
        OptionalInt findAny = IntStream.range(0, spannerSchema.getTables().size()).filter(i -> {
            return spannerSchema.getTables().get(i).equalsIgnoreCase(str);
        }).findAny();
        if (!findAny.isPresent()) {
            throw new IllegalArgumentException(String.format("Unable to retrieve schema for table %s. Found only tables: [%s]", str, String.join(", ", spannerSchema.getTables())));
        }
        Schema.Builder builder = Schema.builder();
        String str2 = spannerSchema.getTables().get(findAny.getAsInt());
        for (SpannerSchema.Column column : spannerSchema.getColumns(str2)) {
            builder = builder.addNullableField(column.getName(), spannerTypeToBeamType(column.getType()));
        }
        return Schema.builder().addStringField("operation").addStringField("commitTimestamp").addInt64Field("recordSequence").addRowField("rowValues", builder.setOptions(Schema.Options.builder().setOption("primaryKeyColumns", Schema.FieldType.array(Schema.FieldType.STRING), spannerSchema.getKeyParts(str2).stream().map((v0) -> {
            return v0.getField();
        }).collect(Collectors.toList()))).build()).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Object stringToParsedValue(Schema.FieldType fieldType, String str) {
        switch (AnonymousClass3.$SwitchMap$org$apache$beam$sdk$schemas$Schema$TypeName[fieldType.getTypeName().ordinal()]) {
            case MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION /* 1 */:
                return str;
            case 2:
                return Long.valueOf(str);
            case 3:
            case 4:
                return Integer.valueOf(str);
            case 5:
                return Float.valueOf(Float.parseFloat(str));
            case 6:
                return Double.valueOf(Double.parseDouble(str));
            case 7:
                return Boolean.valueOf(Boolean.parseBoolean(str));
            case 8:
                return str.getBytes(StandardCharsets.UTF_8);
            case 9:
                return new DateTime(str);
            case 10:
                return new BigDecimal(str);
            default:
                throw new IllegalArgumentException(String.format("Unable to parse field with type %s", fieldType));
        }
    }

    private static Schema.FieldType spannerTypeToBeamType(Type type) {
        switch (AnonymousClass3.$SwitchMap$com$google$cloud$spanner$Type$Code[type.getCode().ordinal()]) {
            case MetadataTableAdminDao.CURRENT_METADATA_TABLE_VERSION /* 1 */:
                return Schema.FieldType.BOOLEAN;
            case 2:
                return Schema.FieldType.BYTES;
            case 3:
                return Schema.FieldType.STRING;
            case 4:
                return Schema.FieldType.INT64;
            case 5:
                return Schema.FieldType.DECIMAL;
            case 6:
                return Schema.FieldType.DOUBLE;
            case 7:
            case 8:
                return Schema.FieldType.DATETIME;
            case 9:
                return Schema.FieldType.array(spannerTypeToBeamType(type.getArrayElementType()));
            case 10:
            case 11:
            default:
                throw new IllegalArgumentException(String.format("Unsupported spanner type: %s", type));
        }
    }
}
