/*
 * Decompiled with CFR 0.152.
 */
package io.castled.warehouses.connectors.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.storage.Blob;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.commons.models.FileFormat;
import io.castled.commons.models.FileStorageNamespace;
import io.castled.commons.streams.GcsFilesRecordInputStream;
import io.castled.commons.streams.RecordInputStream;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.filestorage.GcsClient;
import io.castled.models.QueryMode;
import io.castled.schema.models.RecordSchema;
import io.castled.utils.FileUtils;
import io.castled.warehouses.WarehouseConfig;
import io.castled.warehouses.WarehouseDataPoller;
import io.castled.warehouses.connectors.bigquery.BQSnapshotTracker;
import io.castled.warehouses.connectors.bigquery.BigQueryConnector;
import io.castled.warehouses.connectors.bigquery.BigQueryExportJsonSchemaMapper;
import io.castled.warehouses.connectors.bigquery.BigQueryUtils;
import io.castled.warehouses.connectors.bigquery.BigQueryWarehouseConfig;
import io.castled.warehouses.connectors.bigquery.daos.BQSnapshotTrackerDAO;
import io.castled.warehouses.connectors.bigquery.gcp.GcpClientFactory;
import io.castled.warehouses.models.WarehousePollContext;
import io.castled.warehouses.models.WarehousePollResult;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import org.apache.commons.collections4.CollectionUtils;
import org.jdbi.v3.core.Jdbi;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class BigQueryDataPoller
implements WarehouseDataPoller {
    private static final Logger log = LoggerFactory.getLogger(BigQueryDataPoller.class);
    private final GcpClientFactory gcpClientFactory;
    private final BigQueryConnector bigQueryConnector;
    private final BigQueryExportJsonSchemaMapper bigQueryExportJsonSchemaMapper;
    private final BQSnapshotTrackerDAO bqSnapshotTrackerDAO;

    @Inject
    public BigQueryDataPoller(GcpClientFactory gcpClientFactory, BigQueryConnector bigQueryConnector, BigQueryExportJsonSchemaMapper bigQueryExportJsonSchemaMapper, Jdbi jdbi) {
        this.gcpClientFactory = gcpClientFactory;
        this.bigQueryConnector = bigQueryConnector;
        this.bigQueryExportJsonSchemaMapper = bigQueryExportJsonSchemaMapper;
        this.bqSnapshotTrackerDAO = (BQSnapshotTrackerDAO)jdbi.onDemand(BQSnapshotTrackerDAO.class);
    }

    @Override
    public WarehousePollResult pollRecords(WarehousePollContext warehousePollContext) {
        try {
            BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig)warehousePollContext.getWarehouseConfig();
            BigQuery bigQuery = this.gcpClientFactory.getBigQuery(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
            if (warehousePollContext.getQueryMode() == QueryMode.FULL_LOAD) {
                return this.doFullLoad(bigQuery, warehousePollContext);
            }
            List<String> bookKeepingTables = BigQueryUtils.listTables("castled", bigQuery);
            if (CollectionUtils.isEmpty(bookKeepingTables)) {
                BigQueryUtils.getOrCreateDataset("castled", bigQuery, bigQueryWarehouseConfig.getLocation());
            }
            BQSnapshotTracker bqSnapshotTracker = this.getOrCreateSnapshotTracker(warehousePollContext.getPipelineUUID());
            this.dropOrphanedTables(bookKeepingTables, bqSnapshotTracker, warehousePollContext.getPipelineUUID(), bigQuery);
            String uncommittedSnapshot = this.createUncommittedSnapshot(bigQuery, warehousePollContext);
            bqSnapshotTracker.setUncommittedSnapshot(uncommittedSnapshot);
            RecordSchema querySchema = this.bigQueryConnector.getQuerySchema(bigQueryWarehouseConfig, String.format("select * from %s.%s", "castled", uncommittedSnapshot));
            return WarehousePollResult.builder().recordInputStream(this.createRecordStream(bigQuery, bigQueryWarehouseConfig, warehousePollContext, this.getDataFetchQuery(bookKeepingTables, bqSnapshotTracker), querySchema)).warehouseSchema(querySchema).build();
        }
        catch (Exception e) {
            log.error("Data poll failed for pipeline {} and pipeline run {}", (Object)warehousePollContext.getPipelineUUID(), (Object)warehousePollContext.getPipelineRunId());
            throw new CastledRuntimeException((Throwable)e);
        }
    }

    public WarehousePollResult doFullLoad(BigQuery bigQuery, WarehousePollContext warehousePollContext) throws Exception {
        BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig)warehousePollContext.getWarehouseConfig();
        RecordSchema querySchema = this.bigQueryConnector.getQuerySchema(bigQueryWarehouseConfig, warehousePollContext.getQuery());
        return WarehousePollResult.builder().recordInputStream(this.createRecordStream(bigQuery, bigQueryWarehouseConfig, warehousePollContext, warehousePollContext.getQuery(), querySchema)).warehouseSchema(querySchema).build();
    }

    @Override
    public WarehousePollResult resumePoll(WarehousePollContext warehousePollContext) {
        try {
            BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig)warehousePollContext.getWarehouseConfig();
            GcsClient gcsClient = this.gcpClientFactory.getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
            List<Blob> blobs = gcsClient.listObjects(bigQueryWarehouseConfig.getBucketName(), this.getPipelineRunGcsUnloadDir(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()));
            if (CollectionUtils.isEmpty(blobs)) {
                return this.pollRecords(warehousePollContext);
            }
            String dataFetchQuery = warehousePollContext.getQueryMode() == QueryMode.FULL_LOAD ? warehousePollContext.getQuery() : String.format("select * from %s.%s", "castled", this.bqSnapshotTrackerDAO.getSnapshotTracker(warehousePollContext.getPipelineUUID()).getUncommittedSnapshot());
            RecordSchema querySchema = this.bigQueryConnector.getQuerySchema(bigQueryWarehouseConfig, dataFetchQuery);
            GcsFilesRecordInputStream gcsFilesRecordInputStream = new GcsFilesRecordInputStream(querySchema, this.bigQueryExportJsonSchemaMapper, blobs, FileFormat.JSON, this.getPipelineRunUnloadDirectory(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()), gcsClient, 20, true);
            return WarehousePollResult.builder().recordInputStream(gcsFilesRecordInputStream).warehouseSchema(querySchema).resumed(true).build();
        }
        catch (Exception e) {
            log.error("Data poll resume failed for pipeline {} and pipeline run {}", (Object)warehousePollContext.getPipelineUUID(), (Object)warehousePollContext.getPipelineRunId());
            return this.pollRecords(warehousePollContext);
        }
    }

    private void dropOrphanedTables(List<String> bookKeepingTables, BQSnapshotTracker bqSnapshotTracker, String pipelineUUID, BigQuery bigQuery) {
        for (String bookKeepingTable : bookKeepingTables) {
            if (!bookKeepingTable.startsWith(pipelineUUID) || bookKeepingTable.equals(bqSnapshotTracker.getCommittedSnapshot())) continue;
            bigQuery.delete(TableId.of((String)"castled", (String)bookKeepingTable));
        }
    }

    private BQSnapshotTracker getOrCreateSnapshotTracker(String pipelineUUID) {
        BQSnapshotTracker bqSnapshotTracker = this.bqSnapshotTrackerDAO.getSnapshotTracker(pipelineUUID);
        if (bqSnapshotTracker != null) {
            return bqSnapshotTracker;
        }
        Long snapshotTrackerId = this.bqSnapshotTrackerDAO.createPipelineSnapshot(pipelineUUID, null, null);
        return BQSnapshotTracker.builder().id(snapshotTrackerId).pipelineUUID(pipelineUUID).build();
    }

    private String createUncommittedSnapshot(BigQuery bigQuery, WarehousePollContext warehousePollContext) throws Exception {
        TableId uncommittedSnapshotTable = TableId.of((String)"castled", (String)String.format("%s_snapshot_%d", warehousePollContext.getPipelineUUID(), System.currentTimeMillis()));
        bigQuery.query(QueryJobConfiguration.newBuilder((String)warehousePollContext.getQuery()).setDestinationTable(uncommittedSnapshotTable).setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE).build(), new BigQuery.JobOption[0]);
        this.bqSnapshotTrackerDAO.updateUncommittedSnapshot(warehousePollContext.getPipelineUUID(), uncommittedSnapshotTable.getTable());
        return uncommittedSnapshotTable.getTable();
    }

    private RecordInputStream createRecordStream(BigQuery bigQuery, BigQueryWarehouseConfig bigQueryWarehouseConfig, WarehousePollContext warehousePollContext, String dataFetchQuery, RecordSchema querySchema) throws Exception {
        GcsClient gcsClient = this.gcpClientFactory.getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
        String unloadDirectoryPath = GcsClient.constructGcsPath(bigQueryWarehouseConfig.getBucketName(), Lists.newArrayList((Object[])new String[]{FileStorageNamespace.PIPELINE_UNLOADS.getNamespace(), warehousePollContext.getPipelineUUID(), String.valueOf(warehousePollContext.getPipelineRunId()), "*.json.gz"}));
        String unloadQuery = String.format("EXPORT DATA OPTIONS (uri = '%s',compression = 'GZIP', format='JSON', overwrite=true) AS %s", unloadDirectoryPath, dataFetchQuery);
        QueryJobConfiguration jobConfiguration = QueryJobConfiguration.newBuilder((String)unloadQuery).build();
        BigQueryUtils.runJob(bigQuery.create(JobInfo.newBuilder((JobConfiguration)jobConfiguration).build(), new BigQuery.JobOption[0]));
        List<Blob> blobs = gcsClient.listObjects(bigQueryWarehouseConfig.getBucketName(), this.getPipelineRunGcsUnloadDir(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()));
        return new GcsFilesRecordInputStream(querySchema, this.bigQueryExportJsonSchemaMapper, blobs, FileFormat.JSON, this.getPipelineRunUnloadDirectory(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()), gcsClient, 20, true);
    }

    private String getDataFetchQuery(List<String> bookKeepingTables, BQSnapshotTracker bqSnapshotTracker) {
        String committedSnapshot = bqSnapshotTracker.getCommittedSnapshot();
        String uncommittedSnapshot = bqSnapshotTracker.getUncommittedSnapshot();
        if (Optional.ofNullable(committedSnapshot).filter(bookKeepingTables::contains).isPresent()) {
            return String.format("select * from %s.%s except distinct select * from %s.%s", "castled", uncommittedSnapshot, "castled", committedSnapshot);
        }
        return String.format("select * from %s.%s", "castled", uncommittedSnapshot);
    }

    @Override
    public void cleanupPipelineRunResources(WarehousePollContext warehousePollContext) {
        BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig)warehousePollContext.getWarehouseConfig();
        GcsClient gcsClient = ((GcpClientFactory)ObjectRegistry.getInstance(GcpClientFactory.class)).getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
        gcsClient.deleteDirectory(bigQueryWarehouseConfig.getBucketName(), this.getPipelineRunGcsUnloadDir(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()));
        FileUtils.deleteDirectory((Path)this.getPipelineRunUnloadDirectory(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId()));
    }

    public String getPipelineGcsUnloadDir(String pipelineUUID) {
        return GcsClient.constructObjectKey(Lists.newArrayList((Object[])new String[]{FileStorageNamespace.PIPELINE_UNLOADS.getNamespace(), pipelineUUID}));
    }

    public String getPipelineRunGcsUnloadDir(String pipelineUUID, Long pipelineRunId) {
        return GcsClient.constructObjectKey(Lists.newArrayList((Object[])new String[]{FileStorageNamespace.PIPELINE_UNLOADS.getNamespace(), pipelineUUID, String.valueOf(pipelineRunId)}));
    }

    @Override
    public void cleanupPipelineResources(String pipelineUUID, WarehouseConfig warehouseConfig) {
        BigQueryWarehouseConfig bigQueryWarehouseConfig = (BigQueryWarehouseConfig)warehouseConfig;
        GcsClient gcsClient = ((GcpClientFactory)ObjectRegistry.getInstance(GcpClientFactory.class)).getGcsClient(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
        gcsClient.deleteDirectory(bigQueryWarehouseConfig.getBucketName(), this.getPipelineGcsUnloadDir(pipelineUUID));
        BigQuery bigQuery = ((GcpClientFactory)ObjectRegistry.getInstance(GcpClientFactory.class)).getBigQuery(bigQueryWarehouseConfig.getServiceAccount(), bigQueryWarehouseConfig.getProjectId());
        for (String internalTable : BigQueryUtils.listTables("castled", bigQuery)) {
            bigQuery.delete(TableId.of((String)"castled", (String)internalTable));
        }
    }
}

