/*
 * Decompiled with CFR 0.152.
 */
package io.castled.warehouses.connectors.snowflake;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.constants.ConnectorExecutionConstants;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.filemanager.CsvFileWriter;
import io.castled.filestorage.CastledS3Client;
import io.castled.schema.models.FieldSchema;
import io.castled.schema.models.Tuple;
import io.castled.utils.CastledExceptionUtils;
import io.castled.utils.FileUtils;
import io.castled.utils.SizeUtils;
import io.castled.warehouses.S3BasedWarehouseSyncFailureListener;
import io.castled.warehouses.WarehouseConfig;
import io.castled.warehouses.WarehouseConnectorConfig;
import io.castled.warehouses.connectors.snowflake.SnowflakeClient;
import io.castled.warehouses.connectors.snowflake.SnowflakeConnector;
import io.castled.warehouses.connectors.snowflake.SnowflakeCopySchemaAdapter;
import io.castled.warehouses.connectors.snowflake.SnowflakeUtils;
import io.castled.warehouses.connectors.snowflake.SnowflakeWarehouseConfig;
import io.castled.warehouses.models.WarehousePollContext;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeSyncFailureListener
extends S3BasedWarehouseSyncFailureListener {
    private static final Logger log = LoggerFactory.getLogger(SnowflakeSyncFailureListener.class);
    private final SnowflakeConnector snowflakeConnector;
    private final CsvFileWriter csvFileWriter;
    private final WarehouseConfig warehouseConfig;
    private final WarehousePollContext warehousePollContext;
    private final String s3UploadDir;
    private long pendingRecords = 0L;
    private long failedRecords = 0L;

    public SnowflakeSyncFailureListener(WarehousePollContext warehousePollContext) throws IOException {
        super(warehousePollContext, SnowflakeUtils.getS3Client(warehousePollContext.getWarehouseConfig(), warehousePollContext.getDataEncryptionKey()));
        this.warehousePollContext = warehousePollContext;
        this.warehouseConfig = warehousePollContext.getWarehouseConfig();
        this.snowflakeConnector = (SnowflakeConnector)ObjectRegistry.getInstance(SnowflakeConnector.class);
        this.csvFileWriter = new CsvFileWriter(50000L, this.failureRecordsDirectory, () -> UUID.randomUUID().toString(), this.trackableFields);
        this.s3UploadDir = this.getS3FailedRecordsDirectory(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId());
    }

    @Override
    public void doFlush() throws Exception {
        if (this.pendingRecords > 0L) {
            this.csvFileWriter.close();
            this.uploadFilesToS3();
        }
        SnowflakeWarehouseConfig snowflakeWarehouseConfig = (SnowflakeWarehouseConfig)this.warehouseConfig;
        try (Connection connection = this.snowflakeConnector.getConnection(snowflakeWarehouseConfig);){
            if (this.failedRecords > 0L) {
                this.copyFailedRecords(connection, snowflakeWarehouseConfig);
                this.removeFailedRecordsFromSnapshot(connection);
            }
            this.commitSnapshot(connection);
        }
    }

    private void copyFailedRecords(Connection connection, SnowflakeWarehouseConfig snowflakeWarehouseConfig) throws SQLException {
        String failedRecordTable = this.createFailedRecordsTable(connection);
        ((SnowflakeClient)ObjectRegistry.getInstance(SnowflakeClient.class)).copyFilesToTable(connection, failedRecordTable, CastledS3Client.constructS3Path(this.castledS3Client.getBucket(), Lists.newArrayList((Object[])new String[]{this.s3UploadDir})), this.castledS3Client.getEncryptionKey(), snowflakeWarehouseConfig.getAccessKeyId(), snowflakeWarehouseConfig.getAccessKeySecret());
        this.castledS3Client.deleteDirectory(this.getS3FailedRecordsDirectory(this.warehousePollContext.getPipelineUUID(), this.warehousePollContext.getPipelineRunId()));
    }

    private void removeFailedRecordsFromSnapshot(Connection connection) throws SQLException {
        String uncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        String failedRecordsTable = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        StringBuilder failedRecordsDeleteQuery = new StringBuilder(String.format("delete from %s using %s where 1 = 1", uncommittedSnapshot, failedRecordsTable));
        for (String trackableField : this.trackableFields) {
            failedRecordsDeleteQuery.append(String.format(" AND (%s.%s = %s.%s OR (%s.%s IS NULL and %s.%s IS NULL))", failedRecordsTable, trackableField, uncommittedSnapshot, trackableField, failedRecordsTable, trackableField, uncommittedSnapshot, trackableField));
        }
        try (Statement statement = connection.createStatement();){
            statement.execute(failedRecordsDeleteQuery.toString());
        }
    }

    private void commitSnapshot(Connection connection) {
        String uncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        String committedSnapshotBackup = ConnectorExecutionConstants.getQualifiedCommittedSnapshotBkp(this.warehousePollContext.getPipelineUUID());
        try {
            this.backupSnapshotTable(connection);
            try (Statement statement = connection.createStatement();){
                statement.execute(String.format("alter table %s rename to %s", uncommittedSnapshot, ConnectorExecutionConstants.getQualifiedCommittedSnapshot(this.warehousePollContext.getPipelineUUID())));
                statement.execute(String.format("drop table if exists %s", committedSnapshotBackup));
            }
        }
        catch (Exception e) {
            log.error("Committing snapshot for pipeline {} failed", (Object)this.warehousePollContext.getPipelineUUID(), (Object)e);
            throw new CastledRuntimeException((Throwable)e);
        }
    }

    private void backupSnapshotTable(Connection connection) throws SQLException {
        block8: {
            String committedSnapshot = ConnectorExecutionConstants.getQualifiedCommittedSnapshot(this.warehousePollContext.getPipelineUUID());
            try (Statement statement = connection.createStatement();){
                statement.execute(String.format("alter table %s rename to %s", committedSnapshot, ConnectorExecutionConstants.getQualifiedCommittedSnapshotBkp(this.warehousePollContext.getPipelineUUID())));
            }
            catch (SnowflakeSQLException e) {
                if (CastledExceptionUtils.hasMessage((Throwable)e, (String)"does not exist or not authorized")) break block8;
                throw e;
            }
        }
    }

    @Override
    public synchronized void doWriteRecord(Tuple record) throws Exception {
        List<Object> copyableValues = this.getCopyableValues(record);
        this.csvFileWriter.writeRecord(copyableValues);
        ++this.failedRecords;
        ++this.pendingRecords;
        if (this.pendingRecords > SizeUtils.convertGBToBytes((long)((WarehouseConnectorConfig)ObjectRegistry.getInstance(WarehouseConnectorConfig.class)).getFailedMaxRecordsCount())) {
            this.csvFileWriter.close();
            this.uploadFilesToS3();
            if (!Files.exists(this.failureRecordsDirectory, new LinkOption[0])) {
                Files.createDirectory(this.failureRecordsDirectory, new FileAttribute[0]);
            }
        }
    }

    private void compressFile(Path inputFile) {
        try {
            String compressedFile = inputFile.toString() + ".gzip";
            FileUtils.compressFile((Path)inputFile, (Path)Paths.get(compressedFile, new String[0]));
            Files.deleteIfExists(inputFile);
        }
        catch (Exception e) {
            log.error("File compressed failed for file {}", (Object)inputFile.toString());
            throw new CastledRuntimeException((Throwable)e);
        }
    }

    private void uploadFilesToS3() throws IOException {
        FileUtils.listFiles((Path)this.failureRecordsDirectory).forEach(this::compressFile);
        this.castledS3Client.uploadDirectory(this.failureRecordsDirectory, this.s3UploadDir);
        FileUtils.deleteDirectory((Path)this.failureRecordsDirectory);
        this.pendingRecords = 0L;
    }

    private List<Object> getCopyableValues(Tuple record) throws Exception {
        SnowflakeCopySchemaAdapter copySchemaAdapter = (SnowflakeCopySchemaAdapter)((Object)ObjectRegistry.getInstance(SnowflakeCopySchemaAdapter.class));
        HashMap copyableValues = Maps.newHashMap();
        for (FieldSchema fieldSchema : this.warehousePollContext.getWarehouseSchema().getFieldSchemas()) {
            if (!this.trackableFields.contains(fieldSchema.getName())) continue;
            copyableValues.put(fieldSchema.getName(), copySchemaAdapter.transformValue(record.getValue(fieldSchema.getName()), fieldSchema.getSchema()));
        }
        return this.trackableFields.stream().map(copyableValues::get).collect(Collectors.toList());
    }

    private String createFailedRecordsTable(Connection connection) throws SQLException {
        SnowflakeClient snowflakeClient = (SnowflakeClient)ObjectRegistry.getInstance(SnowflakeClient.class);
        String failedRecordsCreateQuery = String.format("select %s from %s limit 0", String.join((CharSequence)",", this.trackableFields), ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID()));
        String tableName = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        snowflakeClient.createTableFromQuery(connection, tableName, failedRecordsCreateQuery, true);
        return tableName;
    }
}

