/*
 * Decompiled with CFR 0.152.
 */
package io.castled.warehouses.connectors.redshift;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.castled.ObjectRegistry;
import io.castled.constants.ConnectorExecutionConstants;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.filemanager.RawFileWriter;
import io.castled.filestorage.CastledS3Client;
import io.castled.filestorage.ObjectStoreException;
import io.castled.schema.models.FieldSchema;
import io.castled.schema.models.Tuple;
import io.castled.utils.FileUtils;
import io.castled.utils.JsonUtils;
import io.castled.utils.SizeUtils;
import io.castled.warehouses.S3BasedWarehouseSyncFailureListener;
import io.castled.warehouses.WarehouseConnectorConfig;
import io.castled.warehouses.connectors.redshift.RedshiftClient;
import io.castled.warehouses.connectors.redshift.RedshiftConnector;
import io.castled.warehouses.connectors.redshift.RedshiftCopySchemaMapper;
import io.castled.warehouses.connectors.redshift.RedshiftTableProperties;
import io.castled.warehouses.connectors.redshift.RedshiftUtils;
import io.castled.warehouses.connectors.redshift.RedshiftWarehouseConfig;
import io.castled.warehouses.connectors.redshift.models.RedshiftS3CopyManifest;
import io.castled.warehouses.models.WarehousePollContext;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftSyncFailureListener
extends S3BasedWarehouseSyncFailureListener {
    private static final Logger log = LoggerFactory.getLogger(RedshiftSyncFailureListener.class);
    private final RedshiftConnector redshiftConnector;
    private final RawFileWriter rawFileWriter;
    private final RedshiftWarehouseConfig warehouseConfig;
    private final WarehousePollContext warehousePollContext;
    private final String s3UploadDir;
    private final CastledS3Client encryptedS3Client;
    private final CastledS3Client simpleS3Client;
    private int totalBytes = 0;
    private long failedRecords = 0L;

    public RedshiftSyncFailureListener(WarehousePollContext warehousePollContext) {
        super(warehousePollContext, RedshiftUtils.getS3Client(warehousePollContext.getWarehouseConfig(), warehousePollContext.getDataEncryptionKey()));
        this.warehouseConfig = (RedshiftWarehouseConfig)warehousePollContext.getWarehouseConfig();
        this.redshiftConnector = (RedshiftConnector)ObjectRegistry.getInstance(RedshiftConnector.class);
        this.rawFileWriter = new RawFileWriter(SizeUtils.convertMBToBytes((long)50L), this.failureRecordsDirectory, () -> UUID.randomUUID().toString());
        this.warehousePollContext = warehousePollContext;
        this.encryptedS3Client = RedshiftUtils.getS3Client(this.warehouseConfig, warehousePollContext.getDataEncryptionKey());
        this.simpleS3Client = RedshiftUtils.getS3Client(this.warehouseConfig, null);
        this.s3UploadDir = this.getS3FailedRecordsDirectory(warehousePollContext.getPipelineUUID(), warehousePollContext.getPipelineRunId());
    }

    @Override
    public synchronized void doWriteRecord(Tuple tuple) throws Exception {
        String record = this.getCopyableRecord(tuple);
        byte[] recordBytes = record.getBytes();
        this.rawFileWriter.writeRecord(recordBytes);
        this.totalBytes += recordBytes.length;
        ++this.failedRecords;
        if ((long)this.totalBytes > SizeUtils.convertGBToBytes((long)((WarehouseConnectorConfig)ObjectRegistry.getInstance(WarehouseConnectorConfig.class)).getFailedRecordBufferSize())) {
            this.rawFileWriter.close();
            this.uploadFilesToS3();
            if (!Files.exists(this.failureRecordsDirectory, new LinkOption[0])) {
                Files.createDirectory(this.failureRecordsDirectory, new FileAttribute[0]);
            }
        }
    }

    private String getCopyableRecord(Tuple record) throws Exception {
        RedshiftCopySchemaMapper copySchemaMapper = (RedshiftCopySchemaMapper)((Object)ObjectRegistry.getInstance(RedshiftCopySchemaMapper.class));
        HashMap copyableValues = Maps.newHashMap();
        for (FieldSchema field : this.warehousePollContext.getWarehouseSchema().getFieldSchemas()) {
            if (!this.trackableFields.contains(field.getName())) continue;
            copyableValues.put(field.getName(), copySchemaMapper.transformValue(record.getValue(field.getName()), field.getSchema()));
        }
        return JsonUtils.objectToString((Object)copyableValues);
    }

    @Override
    public void doFlush() throws Exception {
        if (this.totalBytes > 0) {
            this.rawFileWriter.close();
            this.uploadFilesToS3();
        }
        try (Connection connection = this.redshiftConnector.getConnection(this.warehouseConfig);){
            if (this.failedRecords > 0L) {
                this.copyFailedRecords(connection);
                this.removeFailedRecordsFromSnapshot(connection);
            }
            this.commitSnapshot(connection);
        }
    }

    private void commitSnapshot(Connection connection) throws SQLException {
        connection.setAutoCommit(false);
        String uncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        String committedSnapshot = ConnectorExecutionConstants.getQualifiedCommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        try {
            try (Statement statement = connection.createStatement();){
                statement.execute(String.format("drop table if exists %s", committedSnapshot));
                statement.execute(String.format("alter table %s rename to %s", uncommittedSnapshot, ConnectorExecutionConstants.getCommittedSnapshot(this.warehousePollContext.getPipelineUUID())));
            }
            connection.commit();
        }
        catch (Exception e) {
            connection.rollback();
            log.error("Committing snapshot for pipeline {} failed", (Object)this.warehousePollContext.getPipelineUUID(), (Object)e);
            throw new CastledRuntimeException((Throwable)e);
        }
        connection.setAutoCommit(true);
    }

    private void removeFailedRecordsFromSnapshot(Connection connection) throws SQLException {
        String uncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        String failedRecordsTable = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        StringBuilder failedRecordsDeleteQuery = new StringBuilder(String.format("delete from %s using %s where 1 = 1", uncommittedSnapshot, failedRecordsTable));
        for (String trackableField : this.trackableFields) {
            failedRecordsDeleteQuery.append(String.format(" AND (%s.%s = %s.%s OR (%s.%s IS NULL and %s.%s IS NULL))", failedRecordsTable, trackableField, uncommittedSnapshot, trackableField, failedRecordsTable, trackableField, uncommittedSnapshot, trackableField));
        }
        try (Statement statement = connection.createStatement();){
            statement.execute(failedRecordsDeleteQuery.toString());
        }
    }

    private void uploadFilesToS3() throws IOException {
        FileUtils.listFiles((Path)this.failureRecordsDirectory).forEach(this::compressFile);
        this.encryptedS3Client.uploadDirectory(this.failureRecordsDirectory, this.s3UploadDir);
        FileUtils.deleteDirectory((Path)this.failureRecordsDirectory);
        this.totalBytes = 0;
    }

    private void compressFile(Path inputFile) {
        try {
            String compressedFile = inputFile.toString() + ".gzip";
            FileUtils.compressFile((Path)inputFile, (Path)Paths.get(compressedFile, new String[0]));
            Files.deleteIfExists(inputFile);
        }
        catch (Exception e) {
            log.error("File compressed failed for file {}", (Object)inputFile.toString());
            throw new CastledRuntimeException((Throwable)e);
        }
    }

    private void copyFailedRecords(Connection connection) throws SQLException, ObjectStoreException {
        this.createFailedRecordsTable(connection);
        RedshiftS3CopyManifest redshiftS3CopyManifest = new RedshiftS3CopyManifest(this.encryptedS3Client.listObjectUrls(this.s3UploadDir).stream().map(s3Url -> new RedshiftS3CopyManifest.ManifestEntry((String)s3Url, true)).collect(Collectors.toList()));
        this.simpleS3Client.uploadText(CastledS3Client.constructObjectKey(Lists.newArrayList((Object[])new String[]{this.s3UploadDir, "manifest.json"})), JsonUtils.objectToString((Object)redshiftS3CopyManifest));
        String manifestFileUrl = CastledS3Client.constructS3Path(this.encryptedS3Client.getBucket(), Lists.newArrayList((Object[])new String[]{this.s3UploadDir, "manifest.json"}));
        ((RedshiftClient)ObjectRegistry.getInstance(RedshiftClient.class)).copyFilesToTable(connection, ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID()), manifestFileUrl, this.encryptedS3Client.getEncryptionKey(), this.warehouseConfig);
        this.encryptedS3Client.deleteDirectory(this.getS3FailedRecordsDirectory(this.warehousePollContext.getPipelineUUID(), this.warehousePollContext.getPipelineRunId()));
    }

    private void createFailedRecordsTable(Connection connection) throws SQLException {
        RedshiftClient redshiftClient = (RedshiftClient)ObjectRegistry.getInstance(RedshiftClient.class);
        String failedRecordsCreateQuery = String.format("select %s from %s limit 0", String.join((CharSequence)",", this.trackableFields), ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID()));
        String tableName = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        RedshiftTableProperties redshiftTableProperties = (RedshiftTableProperties)this.redshiftConnector.getSnapshotTableProperties(this.warehousePollContext.getPrimaryKeys());
        redshiftClient.createTableFromQuery(connection, tableName, failedRecordsCreateQuery, redshiftTableProperties, true);
    }
}

