package co.cask.gcp.spanner.sink;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Output;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSinkContext;
import co.cask.gcp.spanner.common.SpannerUtil;
import co.cask.hydrator.common.LineageRecorder;
import co.cask.hydrator.common.batch.sink.SinkOutputFormatProvider;
import com.google.cloud.RetryOption;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("Spanner")
@Description("Batch sink to write to Cloud Spanner. Cloud Spanner is a fully managed, mission-critical, relational database service that offers transactional consistency at global scale, schemas, SQL (ANSI 2011 with extensions), and automatic, synchronous replication for high availability.")
@Plugin(type = "batchsink")
/* loaded from: input_file:co/cask/gcp/spanner/sink/SpannerSink.class */
public final class SpannerSink extends BatchSink<StructuredRecord, NullWritable, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(SpannerSink.class);
    public static final String NAME = "Spanner";
    private static final String TABLE_NAME = "tablename";
    private final SpannerSinkConfig config;

    public SpannerSink(SpannerSinkConfig spannerSinkConfig) {
        this.config = spannerSinkConfig;
    }

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        this.config.validate();
    }

    public void prepareRun(BatchSinkContext batchSinkContext) {
        this.config.validate();
        Spanner spanner = null;
        try {
            try {
                spanner = SpannerUtil.getSpannerService(this.config.getServiceAccountFilePath(), this.config.getProject());
                createTableIfNotPresent(spanner.getDatabaseClient(DatabaseId.of(this.config.getProject(), this.config.getInstance(), this.config.getDatabase())), getOrCreateDatabase(spanner.getDatabaseAdminClient()));
                if (spanner != null) {
                    spanner.close();
                }
                Configuration configuration = new Configuration();
                LineageRecorder lineageRecorder = new LineageRecorder(batchSinkContext, this.config.getReferenceName());
                lineageRecorder.createExternalDataset(this.config.getSchema());
                SpannerOutputFormat.configure(configuration, this.config);
                batchSinkContext.addOutput(Output.of(this.config.getReferenceName(), new SinkOutputFormatProvider((Class<? extends OutputFormat>) SpannerOutputFormat.class, configuration)));
                List fields = this.config.getSchema().getFields();
                if (fields == null || fields.isEmpty()) {
                    return;
                }
                lineageRecorder.recordWrite("Write", "Wrote to Spanner table.", (List) fields.stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList()));
            } catch (IOException e) {
                throw new RuntimeException("Exception while trying to get Spanner service. ", e);
            }
        } catch (Throwable th) {
            if (spanner != null) {
                spanner.close();
            }
            throw th;
        }
    }

    private void createTableIfNotPresent(DatabaseClient databaseClient, Database database) {
        if (isTablePresent(databaseClient)) {
            return;
        }
        if (Strings.isNullOrEmpty(this.config.getKeys())) {
            throw new IllegalArgumentException(String.format("Spanner table %s does not exist. To create it from the pipeline, primary keys must be provided", this.config.getTable()));
        }
        String convertSchemaToCreateStatement = SpannerUtil.convertSchemaToCreateStatement(this.config.getTable(), this.config.getKeys(), this.config.getSchema());
        LOG.debug("Creating table with create statement: {} in database {} of instance {}", new Object[]{convertSchemaToCreateStatement, this.config.getDatabase(), this.config.getInstance()});
        database.updateDdl(Collections.singletonList(convertSchemaToCreateStatement), null).waitFor(new RetryOption[0]).getResult();
    }

    private boolean isTablePresent(DatabaseClient databaseClient) {
        ResultSet executeQuery = databaseClient.singleUse().executeQuery(Statement.newBuilder(String.format("SELECT\n    t.table_name\nFROM\n    information_schema.tables AS t\nWHERE\n    t.table_catalog = '' AND t.table_schema = '' AND\n    t.table_name = @%s", TABLE_NAME)).bind(TABLE_NAME).to(this.config.getTable()).build(), new Options.QueryOption[0]);
        boolean z = false;
        if (executeQuery.next()) {
            z = true;
        }
        executeQuery.close();
        return z;
    }

    private Database getOrCreateDatabase(DatabaseAdminClient databaseAdminClient) {
        Database databaseIfPresent = getDatabaseIfPresent(databaseAdminClient);
        if (databaseIfPresent == null) {
            LOG.debug("Database not found. Creating database {} in instance {}.", this.config.getDatabase(), this.config.getInstance());
            databaseIfPresent = databaseAdminClient.createDatabase(this.config.getInstance(), this.config.getDatabase(), Collections.emptyList()).waitFor(new RetryOption[0]).getResult();
        }
        return databaseIfPresent;
    }

    @Nullable
    private Database getDatabaseIfPresent(DatabaseAdminClient databaseAdminClient) {
        Database database = null;
        try {
            database = databaseAdminClient.getDatabase(this.config.getInstance(), this.config.getDatabase());
        } catch (SpannerException e) {
            if (e.getErrorCode() != ErrorCode.NOT_FOUND) {
                throw e;
            }
        }
        return database;
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.config.validate();
    }

    public void transform(StructuredRecord structuredRecord, Emitter<KeyValue<NullWritable, StructuredRecord>> emitter) {
        emitter.emit(new KeyValue((Object) null, structuredRecord));
    }

    public void destroy() {
        super.destroy();
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((StructuredRecord) obj, (Emitter<KeyValue<NullWritable, StructuredRecord>>) emitter);
    }
}
