/*
 * Decompiled with CFR 0.152.
 */
package io.castled.warehouses.connectors.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.exceptions.connect.ConnectException;
import io.castled.exceptions.connect.ConnectionError;
import io.castled.models.QueryResults;
import io.castled.schema.models.RecordSchema;
import io.castled.warehouses.BaseWarehouseConnector;
import io.castled.warehouses.TableProperties;
import io.castled.warehouses.WarehouseDataPoller;
import io.castled.warehouses.WarehouseSyncFailureListener;
import io.castled.warehouses.connectors.bigquery.BQSnapshotTracker;
import io.castled.warehouses.connectors.bigquery.BQSyncFailureListener;
import io.castled.warehouses.connectors.bigquery.BigQueryDataPoller;
import io.castled.warehouses.connectors.bigquery.BigQueryUtils;
import io.castled.warehouses.connectors.bigquery.BigQueryWarehouseConfig;
import io.castled.warehouses.connectors.bigquery.daos.BQSnapshotTrackerDAO;
import io.castled.warehouses.connectors.bigquery.gcp.GcpClientFactory;
import io.castled.warehouses.models.WarehousePollContext;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class BigQueryConnector
extends BaseWarehouseConnector<BigQueryWarehouseConfig> {
    private static final Logger log = LoggerFactory.getLogger(BigQueryConnector.class);
    private final GcpClientFactory gcpClientFactory;
    private final BQSnapshotTrackerDAO bqSnapshotTrackerDAO;

    @Inject
    public BigQueryConnector(GcpClientFactory gcpClientFactory, Jdbi jdbi) {
        this.gcpClientFactory = gcpClientFactory;
        this.bqSnapshotTrackerDAO = (BQSnapshotTrackerDAO)jdbi.onDemand(BQSnapshotTrackerDAO.class);
    }

    @Override
    public void testConnectionForDataPoll(BigQueryWarehouseConfig config) throws ConnectException {
        BigQuery bigQuery = this.gcpClientFactory.getBigQuery(config.getServiceAccount(), config.getProjectId());
        String blobName = "castled_" + UUID.randomUUID();
        Bucket bucket = null;
        try {
            BigQueryUtils.getOrCreateDataset("castled", bigQuery, config.getLocation());
            BigQueryUtils.listTables("castled", bigQuery);
            Storage storage = this.gcpClientFactory.getGcsClient(config.getServiceAccount(), config.getProjectId()).getStorage();
            bucket = storage.get(config.getBucketName(), new Storage.BucketGetOption[0]);
            if (bucket == null) {
                throw new ConnectException(ConnectionError.INVALID_STORAGE, String.format("Bucket %s not found", config.getBucketName()));
            }
            if (!bucket.getLocation().equalsIgnoreCase(config.getLocation())) {
                throw new ConnectException(ConnectionError.INVALID_STORAGE, String.format("GCS bucket %s needs to be created on the configured location %s", config.getBucketName(), config.getLocation()));
            }
            bucket.create(blobName, "test".getBytes(StandardCharsets.UTF_8), new Bucket.BlobTargetOption[0]);
            for (Blob blob : bucket.list(new Storage.BlobListOption[]{Storage.BlobListOption.prefix((String)blobName)}).getValues()) {
                if (!blob.getName().equals(blobName)) continue;
                blob.downloadTo((OutputStream)new ByteArrayOutputStream(), new Blob.BlobSourceOption[0]);
            }
            Optional.ofNullable(storage.get(config.getBucketName(), new Storage.BucketGetOption[0])).orElseThrow(() -> new ConnectException(ConnectionError.INVALID_STORAGE, "Bucket not found"));
        }
        catch (Exception e) {
            if (bucket != null) {
                this.tryDeleteBlob(blobName, bucket);
            }
            log.warn("Test connection failed for Big query service account {}", (Object)config.getServiceAccount(), (Object)e);
            throw new ConnectException(ConnectionError.UNKNOWN, e.getMessage());
        }
    }

    private void tryDeleteBlob(String blobName, Bucket bucket) {
        try {
            for (Blob blob : bucket.list(new Storage.BlobListOption[]{Storage.BlobListOption.prefix((String)blobName)}).getValues()) {
                if (!blob.getName().equals(blobName)) continue;
                blob.downloadTo((OutputStream)new ByteArrayOutputStream(), new Blob.BlobSourceOption[0]);
            }
        }
        catch (Exception e) {
            log.error("Failed to delete blob {}", (Object)blobName, (Object)e);
        }
    }

    @Override
    public WarehouseDataPoller getDataPoller() {
        return (WarehouseDataPoller)ObjectRegistry.getInstance(BigQueryDataPoller.class);
    }

    @Override
    public RecordSchema getQuerySchema(BigQueryWarehouseConfig bigQueryWarehouseConfig, String query) throws Exception {
        String limitedQuery = String.format("select * from (%s) limit 0", query);
        BigQuery bigQuery = this.gcpClientFactory.getBigQuery(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
        TableResult tableResult = bigQuery.query(QueryJobConfiguration.newBuilder((String)limitedQuery).build(), new BigQuery.JobOption[0]);
        return BigQueryUtils.bqSchemaToConnectSchema(tableResult.getSchema());
    }

    @Override
    public WarehouseSyncFailureListener syncFailureListener(WarehousePollContext warehousePollContext) throws Exception {
        return new BQSyncFailureListener(warehousePollContext);
    }

    @Override
    public TableProperties getSnapshotTableProperties(List<String> recordIdKeys) {
        return null;
    }

    @Override
    public void restartPoll(String pipelineUUID, BigQueryWarehouseConfig config) {
        BigQuery bigQuery = this.gcpClientFactory.getBigQuery(config.getServiceAccount(), config.getProjectId());
        BQSnapshotTracker bqSnapshotTracker = this.bqSnapshotTrackerDAO.getSnapshotTracker(pipelineUUID);
        if (bqSnapshotTracker.getCommittedSnapshot() != null) {
            bigQuery.delete(TableId.of((String)"castled", (String)bqSnapshotTracker.getCommittedSnapshot()));
        }
    }

    @Override
    public QueryResults previewQuery(String query, BigQueryWarehouseConfig bigQueryWarehouseConfig, int maxRows) throws Exception {
        String limitedQuery = String.format("select * from (%s) limit %d", query, maxRows);
        BigQuery bigQuery = this.gcpClientFactory.getBigQuery(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
        TableResult tableResult = bigQuery.query(QueryJobConfiguration.newBuilder((String)limitedQuery).build(), new BigQuery.JobOption[0]);
        List<String> headers = BigQueryUtils.fieldNames(tableResult.getSchema());
        ArrayList rows = Lists.newArrayList();
        do {
            for (FieldValueList fieldValueList : tableResult.getValues()) {
                int index = 0;
                ArrayList row = Lists.newArrayList();
                for (FieldValue fieldValue : fieldValueList) {
                    if (fieldValue.getValue() == null) {
                        row.add(null);
                    } else {
                        row.add(BigQueryUtils.parseFieldValue(fieldValue.getValue(), tableResult.getSchema().getFields().get(index).getType()));
                    }
                    ++index;
                }
                rows.add(row);
            }
        } while ((tableResult = tableResult.getNextPage()) != null);
        return new QueryResults(headers, (List)rows);
    }

    @Override
    public Class<BigQueryWarehouseConfig> getConfigType() {
        return BigQueryWarehouseConfig.class;
    }

    @Override
    public BigQueryWarehouseConfig filterRestrictedConfigDetails(BigQueryWarehouseConfig bigQueryWarehouseConfig) {
        return bigQueryWarehouseConfig;
    }
}

