package co.cask.gcp.bigquery;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.data.schema.UnsupportedTypeException;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.api.PipelineConfigurer;
import co.cask.cdap.etl.api.batch.BatchRuntimeContext;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.gcp.spanner.SpannerConstants;
import co.cask.hydrator.common.LineageRecorder;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase;
import com.google.cloud.hadoop.io.bigquery.AvroBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Name("BigQueryTable")
@Description("This source reads the entire contents of a BigQuery table. BigQuery is Google's serverless, highly scalable, enterprise data warehouse.Data is first written to a temporary location on Google Cloud Storage, then read into the pipeline from there.")
@Plugin(type = "batchsource")
/* loaded from: input_file:co/cask/gcp/bigquery/BigQuerySource.class */
public final class BigQuerySource extends BatchSource<LongWritable, GenericData.Record, StructuredRecord> {
    private static final Logger LOG = LoggerFactory.getLogger(BigQuerySource.class);
    public static final String NAME = "BigQueryTable";
    private BigQuerySourceConfig config;
    private Schema outputSchema;
    private Configuration configuration;
    private final BigQueryAvroToStructuredTransformer transformer = new BigQueryAvroToStructuredTransformer();
    private UUID uuid;

    public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
        super.configurePipeline(pipelineConfigurer);
        this.config.validate();
        if (this.config.containsMacro(SpannerConstants.SCHEMA)) {
            return;
        }
        this.outputSchema = this.config.getSchema();
        pipelineConfigurer.getStageConfigurer().setOutputSchema(this.outputSchema);
    }

    public void prepareRun(BatchSourceContext batchSourceContext) throws Exception {
        validateOutputSchema();
        this.uuid = UUID.randomUUID();
        this.configuration = BigQueryUtils.getBigQueryConfig(this.config.getServiceAccountFilePath(), this.config.getProject());
        String bucket = this.config.getBucket();
        if (bucket == null) {
            bucket = this.uuid.toString();
            this.configuration.setBoolean(GoogleHadoopFileSystemBase.GCE_BUCKET_DELETE_ENABLE_KEY, true);
        }
        this.configuration.set(GoogleHadoopFileSystemBase.GCS_SYSTEM_BUCKET_KEY, bucket);
        this.configuration.setBoolean("fs.gs.impl.disable.cache", true);
        this.configuration.setBoolean("fs.gs.metadata.cache.enable", false);
        AvroBigQueryInputFormat.setTemporaryCloudStorageDirectory(this.configuration, String.format("gs://%s/hadoop/input/%s", bucket, this.uuid));
        AvroBigQueryInputFormat.setEnableShardedExport(this.configuration, false);
        BigQueryConfiguration.configureBigQueryInput(this.configuration, this.config.getDatasetProject(), this.config.getDataset(), this.config.getTable());
        Job job = Job.getInstance(this.configuration);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        emitLineage(batchSourceContext);
        setInputFormat(batchSourceContext);
    }

    public void initialize(BatchRuntimeContext batchRuntimeContext) throws Exception {
        super.initialize(batchRuntimeContext);
        this.outputSchema = this.config.getSchema();
    }

    public void transform(KeyValue<LongWritable, GenericData.Record> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(this.transformer.transform((GenericRecord) keyValue.getValue(), this.outputSchema));
    }

    public void onRunFinish(boolean z, BatchSourceContext batchSourceContext) {
        Path path = new Path(String.format("gs://%s", this.uuid.toString()));
        try {
            if (this.config.getBucket() == null) {
                FileSystem fileSystem = path.getFileSystem(this.configuration);
                if (fileSystem.exists(path)) {
                    fileSystem.delete(path, true);
                }
            }
        } catch (IOException e) {
            LOG.warn("Failed to delete bucket " + path.toUri().getPath() + ", " + e.getMessage());
        }
    }

    @javax.ws.rs.Path("getSchema")
    public Schema getSchema(BigQuerySourceConfig bigQuerySourceConfig) throws Exception {
        String dataset = bigQuerySourceConfig.getDataset();
        String table = bigQuerySourceConfig.getTable();
        String datasetProject = bigQuerySourceConfig.getDatasetProject();
        Table bigQueryTable = BigQueryUtils.getBigQueryTable(bigQuerySourceConfig.getServiceAccountFilePath(), datasetProject, dataset, table);
        if (bigQueryTable == null) {
            throw new IllegalArgumentException(String.format("BigQuery table '%s:%s.%s' does not exist", datasetProject, dataset, table));
        }
        com.google.cloud.bigquery.Schema schema = bigQueryTable.getDefinition().getSchema();
        if (schema == null) {
            throw new IllegalArgumentException(String.format("Cannot read from table '%s:%s.%s' because it has no schema.", datasetProject, dataset, bigQueryTable));
        }
        return Schema.recordOf("output", getSchemaFields(schema));
    }

    private void validateOutputSchema() throws IOException {
        String dataset = this.config.getDataset();
        String table = this.config.getTable();
        String datasetProject = this.config.getDatasetProject();
        Table bigQueryTable = BigQueryUtils.getBigQueryTable(this.config.getServiceAccountFilePath(), datasetProject, dataset, table);
        if (bigQueryTable == null) {
            throw new IllegalArgumentException(String.format("BigQuery table '%s:%s.%s' does not exist.", datasetProject, dataset, table));
        }
        com.google.cloud.bigquery.Schema schema = bigQueryTable.getDefinition().getSchema();
        if (schema == null) {
            throw new IllegalArgumentException(String.format("Cannot read from table '%s:%s.%s' because it has no schema.", datasetProject, dataset, bigQueryTable));
        }
        List<String> schemaMinusBqFields = BigQueryUtils.getSchemaMinusBqFields(this.config.getSchema().getFields(), schema.getFields());
        if (!schemaMinusBqFields.isEmpty()) {
            throw new IllegalArgumentException(String.format("Output schema has field(s) '%s' which are not present in table '%s:%s.%s' schema.", schemaMinusBqFields, datasetProject, dataset, bigQueryTable));
        }
        FieldList fields = schema.getFields();
        for (Schema.Field field : this.config.getSchema().getFields()) {
            validateSimpleTypes(field);
            BigQueryUtils.validateFieldSchemaMatches(fields.get(field.getName()), field, dataset, table);
        }
    }

    private void validateSimpleTypes(Schema.Field field) {
        String name = field.getName();
        Schema nonNullableSchema = BigQueryUtils.getNonNullableSchema(field.getSchema());
        Schema.Type type = nonNullableSchema.getType();
        if (!type.isSimpleType()) {
            throw new IllegalArgumentException(String.format("Field '%s' is of unsupported type '%s'.", name, type));
        }
        if (nonNullableSchema.getLogicalType() == null && type == Schema.Type.INT) {
            throw new IllegalArgumentException(String.format("Field '%s' is of unsupported type 'int'. It should be 'long'", name));
        }
        if (nonNullableSchema.getLogicalType() == null && type == Schema.Type.FLOAT) {
            throw new IllegalArgumentException(String.format("Field '%s' is of unsupported type 'float'. It should be 'double'", name));
        }
    }

    private List<Schema.Field> getSchemaFields(com.google.cloud.bigquery.Schema schema) throws UnsupportedTypeException {
        Schema of;
        ArrayList arrayList = new ArrayList();
        Iterator<Field> it = schema.getFields().iterator();
        while (it.hasNext()) {
            Field next = it.next();
            StandardSQLTypeName standardType = next.getType().getStandardType();
            if (standardType == StandardSQLTypeName.FLOAT64) {
                of = Schema.of(Schema.Type.DOUBLE);
            } else if (standardType == StandardSQLTypeName.BOOL) {
                of = Schema.of(Schema.Type.BOOLEAN);
            } else if (standardType == StandardSQLTypeName.INT64) {
                of = Schema.of(Schema.Type.LONG);
            } else if (standardType == StandardSQLTypeName.STRING) {
                of = Schema.of(Schema.Type.STRING);
            } else if (standardType == StandardSQLTypeName.BYTES) {
                of = Schema.of(Schema.Type.BYTES);
            } else if (standardType == StandardSQLTypeName.TIME) {
                of = Schema.of(Schema.LogicalType.TIME_MICROS);
            } else if (standardType == StandardSQLTypeName.DATE) {
                of = Schema.of(Schema.LogicalType.DATE);
            } else {
                if (standardType != StandardSQLTypeName.TIMESTAMP && standardType != StandardSQLTypeName.DATETIME) {
                    throw new UnsupportedTypeException(String.format("BigQuery column '%s' is of unsupported type '%s'.", next.getName(), standardType));
                }
                of = Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
            }
            if (next.getMode() == null || next.getMode() == Field.Mode.NULLABLE) {
                arrayList.add(Schema.Field.of(next.getName(), Schema.nullableOf(of)));
            } else if (next.getMode() == Field.Mode.REQUIRED) {
                arrayList.add(Schema.Field.of(next.getName(), of));
            } else if (next.getMode() == Field.Mode.REPEATED) {
                throw new UnsupportedTypeException(String.format("BigQuery column '%s' is of unsupported mode 'repeated'.", next.getName()));
            }
        }
        return arrayList;
    }

    private void setInputFormat(BatchSourceContext batchSourceContext) {
        batchSourceContext.setInput(Input.of(this.config.referenceName, new InputFormatProvider() { // from class: co.cask.gcp.bigquery.BigQuerySource.1
            public String getInputFormatClassName() {
                return AvroBigQueryInputFormat.class.getName();
            }

            public Map<String, String> getInputFormatConfiguration() {
                HashMap hashMap = new HashMap();
                Iterator it = BigQuerySource.this.configuration.iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    hashMap.put(entry.getKey(), entry.getValue());
                }
                return hashMap;
            }
        }));
    }

    private void emitLineage(BatchSourceContext batchSourceContext) {
        LineageRecorder lineageRecorder = new LineageRecorder(batchSourceContext, this.config.referenceName);
        lineageRecorder.createExternalDataset(this.config.getSchema());
        if (this.config.getSchema().getFields() != null) {
            lineageRecorder.recordRead("Read", "Read from BigQuery table.", (List) this.config.getSchema().getFields().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toList()));
        }
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<LongWritable, GenericData.Record>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
