package co.cask.gcp.spanner.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.gcp.spanner.SpannerConstants;
import co.cask.gcp.spanner.common.SpannerUtil;
import co.cask.hydrator.common.LineageRecorder;
import co.cask.hydrator.common.SourceInputFormatProvider;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.BatchTransactionId;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.PartitionOptions;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.Type;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.ws.rs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Spanner")
@Description("Batch source to read from Cloud Spanner. Cloud Spanner is a fully managed, mission-critical, relational database service that offers transactional consistency at global scale, schemas, SQL (ANSI 2011 with extensions), and automatic, synchronous replication for high availability.")
@Plugin(type = "batchsource")
/* loaded from: input_file:co/cask/gcp/spanner/source/SpannerSource.class */
public class SpannerSource extends BatchSource<NullWritable, ResultSet, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerSource.class);
    private static final String TABLE_NAME = "TableName";
    private static final Statement.Builder SCHEMA_STATEMENT_BUILDER = Statement.newBuilder(String.format("SELECT  t.column_name,t.spanner_type, t.is_nullable FROM information_schema.columns AS t WHERE   t.table_catalog = ''  AND  t.table_schema = '' AND t.table_name = @%s", TABLE_NAME));
    public static final String NAME = "Spanner";
    private final SpannerSourceConfig config;
    private Schema schema;
    private Spanner spanner;

    public SpannerSource(SpannerSourceConfig spannerSourceConfig) {
        this.config = spannerSourceConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        this.config.validate();
        if (this.config.containsMacro(SpannerConstants.SCHEMA)) {
            return;
        }
        pipelineConfigurer.getStageConfigurer().setOutputSchema(this.config.getSchema());
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        this.config.validate();
        String project = this.config.getProject();
        Configuration configuration = new Configuration();
        initializeConfig(configuration, project);
        this.spanner = SpannerUtil.getSpannerService(this.config.getServiceAccountFilePath(), project);
        BatchReadOnlyTransaction batchReadOnlyTransaction = this.spanner.getBatchClient(DatabaseId.of(project, this.config.instance, this.config.database)).batchReadOnlyTransaction(TimestampBound.ofReadTimestamp(Timestamp.ofTimeMicroseconds(TimeUnit.MILLISECONDS.toMicros(batchSourceContext.getLogicalStartTime()))));
        BatchTransactionId batchTransactionId = batchReadOnlyTransaction.getBatchTransactionId();
        ArrayList arrayList = new ArrayList(batchReadOnlyTransaction.partitionQuery(getPartitionOptions(), Statement.of(String.format("Select * from %s;", this.config.table)), new Options.QueryOption[0]));
        configuration.set(SpannerConstants.SPANNER_BATCH_TRANSACTION_ID, getSerializedObjectString(batchTransactionId));
        configuration.set(SpannerConstants.PARTITIONS_LIST, getSerializedObjectString(arrayList));
        LineageRecorder lineageRecorder = new LineageRecorder(batchSourceContext, this.config.referenceName);
        lineageRecorder.createExternalDataset(this.config.getSchema());
        batchSourceContext.setInput(Input.of(this.config.referenceName, new SourceInputFormatProvider((Class<? extends InputFormat>) SpannerInputFormat.class, configuration)));
        if (this.config.getSchema() == null || this.config.getSchema().getFields() == null) {
            return;
        }
        lineageRecorder.recordRead("Read", "Read from Spanner table.", (List) this.config.getSchema().getFields().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList()));
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.schema = this.config.getSchema();
    }

    public void transform(KeyValue<NullWritable, ResultSet> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        StructuredRecord.Builder builder = StructuredRecord.builder(this.schema);
        List fields = this.schema.getFields();
        ResultSet resultSet = (ResultSet) keyValue.getValue();
        Iterator it = fields.iterator();
        while (it.hasNext()) {
            String name = ((Schema.Field) it.next()).getName();
            if (resultSet.getColumnType(name) != null && !resultSet.isNull(name)) {
                switch (r0.getCode()) {
                    case BOOL:
                        builder.set(name, Boolean.valueOf(resultSet.getBoolean(name)));
                        break;
                    case INT64:
                        builder.set(name, Long.valueOf(resultSet.getLong(name)));
                        break;
                    case FLOAT64:
                        builder.set(name, Double.valueOf(resultSet.getDouble(name)));
                        break;
                    case STRING:
                        builder.set(name, resultSet.getString(name));
                        break;
                    case BYTES:
                        builder.set(name, resultSet.getBytes(name).toByteArray());
                        break;
                    case DATE:
                        Date date = resultSet.getDate(name);
                        builder.setDate(name, LocalDate.of(date.getYear(), date.getMonth(), date.getDayOfMonth()));
                        break;
                    case TIMESTAMP:
                        builder.setTimestamp(name, ZonedDateTime.ofInstant(Instant.ofEpochSecond(resultSet.getTimestamp(name).getSeconds()).plusNanos(r0.getNanos()), ZoneId.ofOffset("UTC", ZoneOffset.UTC)));
                        break;
                }
            }
        }
        emitter.emit(builder.build());
    }

    public void onRunFinish(boolean z, BatchSourceContext batchSourceContext) {
        LOG.info("Run finished, closing spanner client");
        if (this.spanner != null) {
            this.spanner.close();
        }
    }

    private void initializeConfig(Configuration configuration, String str) {
        setIfValueNotNull(configuration, SpannerConstants.PROJECT_ID, str);
        setIfValueNotNull(configuration, SpannerConstants.SERVICE_ACCOUNT_FILE_PATH, this.config.getServiceAccountFilePath());
        setIfValueNotNull(configuration, SpannerConstants.INSTANCE_ID, this.config.instance);
        setIfValueNotNull(configuration, SpannerConstants.DATABASE, this.config.database);
        setIfValueNotNull(configuration, SpannerConstants.QUERY, String.format("Select * from %s;", this.config.table));
    }

    private void setIfValueNotNull(Configuration configuration, String str, String str2) {
        if (str2 != null) {
            configuration.set(str, str2);
        }
    }

    private String getSerializedObjectString(Object obj) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    objectOutputStream.writeObject(obj);
                    objectOutputStream.flush();
                    String encodeToString = Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray());
                    if (objectOutputStream != null) {
                        if (0 != 0) {
                            try {
                                objectOutputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            objectOutputStream.close();
                        }
                    }
                    return encodeToString;
                } finally {
                }
            } catch (Throwable th4) {
                if (objectOutputStream != null) {
                    if (th2 != null) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    private PartitionOptions getPartitionOptions() {
        PartitionOptions.Builder newBuilder = PartitionOptions.newBuilder();
        if (this.config.partitionSizeMB != null) {
            newBuilder.setPartitionSizeBytes(this.config.partitionSizeMB.longValue() * 1024 * 1024);
        }
        if (this.config.maxPartitions != null) {
            newBuilder.setMaxPartitions(this.config.maxPartitions.longValue());
        }
        return newBuilder.build();
    }

    @Path("getSchema")
    public Schema getSchema(SpannerSourceConfig spannerSourceConfig) throws Exception {
        String project = spannerSourceConfig.getProject();
        Spanner spannerService = SpannerUtil.getSpannerService(spannerSourceConfig.getServiceAccountFilePath(), project);
        ResultSet executeQuery = spannerService.getDatabaseClient(DatabaseId.of(project, spannerSourceConfig.instance, spannerSourceConfig.database)).singleUse().executeQuery(SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(spannerSourceConfig.table).build(), new Options.QueryOption[0]);
        Throwable th = null;
        try {
            try {
                ArrayList arrayList = new ArrayList();
                while (executeQuery.next()) {
                    String string = executeQuery.getString("column_name");
                    String string2 = executeQuery.getString("spanner_type");
                    boolean equals = "YES".equals(executeQuery.getString("is_nullable"));
                    Schema parseSchemaFromSpannerTypeString = parseSchemaFromSpannerTypeString(string2);
                    arrayList.add(Schema.Field.of(string, equals ? Schema.nullableOf(parseSchemaFromSpannerTypeString) : parseSchemaFromSpannerTypeString));
                }
                spannerService.close();
                Schema recordOf = Schema.recordOf("outputSchema", arrayList);
                if (executeQuery != null) {
                    if (0 != 0) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
                return recordOf;
            } finally {
            }
        } catch (Throwable th3) {
            if (executeQuery != null) {
                if (th != null) {
                    try {
                        executeQuery.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeQuery.close();
                }
            }
            throw th3;
        }
    }

    private Schema parseSchemaFromSpannerTypeString(String str) throws UnsupportedTypeException {
        if (str.startsWith("ARRAY")) {
            throw new UnsupportedTypeException("Array Type is unsupported currently");
        }
        if (str.startsWith("STRING")) {
            return Schema.of(Schema.Type.STRING);
        }
        if (str.startsWith("BYTES")) {
            return Schema.of(Schema.Type.BYTES);
        }
        switch (Type.Code.valueOf(str)) {
            case BOOL:
                return Schema.of(Schema.Type.BOOLEAN);
            case INT64:
                return Schema.of(Schema.Type.LONG);
            case FLOAT64:
                return Schema.of(Schema.Type.DOUBLE);
            case STRING:
            case BYTES:
            default:
                throw new UnsupportedTypeException(String.format("Type : %s is unsupported currently", str));
            case DATE:
                return Schema.of(Schema.LogicalType.DATE);
            case TIMESTAMP:
                return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
        }
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<NullWritable, ResultSet>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
