/*
 * Decompiled with CFR 0.152.
 */
package io.castled.warehouses.connectors.postgres;

import com.google.common.collect.Lists;
import io.castled.ObjectRegistry;
import io.castled.constants.ConnectorExecutionConstants;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.models.Field;
import io.castled.schema.models.SchemaType;
import io.castled.schema.models.Tuple;
import io.castled.utils.StringUtils;
import io.castled.warehouses.WarehouseConfig;
import io.castled.warehouses.WarehouseSyncFailureListener;
import io.castled.warehouses.connectors.postgres.PostgresClient;
import io.castled.warehouses.connectors.postgres.PostgresTableProperties;
import io.castled.warehouses.connectors.postgres.PostgresWarehouseConfig;
import io.castled.warehouses.connectors.postgres.PostgresWarehouseConnector;
import io.castled.warehouses.models.WarehousePollContext;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSyncFailureListener
extends WarehouseSyncFailureListener {
    private static final Logger log = LoggerFactory.getLogger(PostgresSyncFailureListener.class);
    private static final int MAX_BUFFERED_RECORDS = 100;
    private final List<Tuple> bufferedRecords = Lists.newArrayList();
    private final Connection connection;
    private long failedRecords = 0L;
    private boolean failedRecordsTableCreated = false;

    public PostgresSyncFailureListener(WarehousePollContext warehousePollContext) throws Exception {
        super(warehousePollContext);
        this.connection = ((PostgresWarehouseConnector)ObjectRegistry.getInstance(PostgresWarehouseConnector.class)).getConnection((PostgresWarehouseConfig)warehousePollContext.getWarehouseConfig());
    }

    @Override
    public synchronized void doWriteRecord(Tuple record) throws Exception {
        this.bufferedRecords.add(record);
        ++this.failedRecords;
        if (this.bufferedRecords.size() >= 100) {
            this.insertBufferedRecords(this.bufferedRecords);
            this.bufferedRecords.clear();
        }
    }

    private void commitSnapshot() throws SQLException {
        this.connection.setAutoCommit(false);
        boolean autoCommit = this.connection.getAutoCommit();
        if (autoCommit) {
            this.connection.setAutoCommit(false);
        }
        String uncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        String committedSnapshot = ConnectorExecutionConstants.getQualifiedCommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        try {
            try (Statement statement = this.connection.createStatement();){
                statement.execute(String.format("drop table if exists %s", committedSnapshot));
                statement.execute(String.format("alter table %s rename to %s", uncommittedSnapshot, ConnectorExecutionConstants.getCommittedSnapshot(this.warehousePollContext.getPipelineUUID())));
            }
            this.connection.commit();
        }
        catch (Exception e) {
            this.connection.rollback();
            log.error("Committing snapshot for pipeline {} failed", (Object)this.warehousePollContext.getPipelineUUID(), (Object)e);
            throw new CastledRuntimeException((Throwable)e);
        }
        if (autoCommit) {
            this.connection.setAutoCommit(true);
        }
    }

    private void createFailedRecordsTable() throws SQLException {
        PostgresClient postgresClient = (PostgresClient)ObjectRegistry.getInstance(PostgresClient.class);
        String failedRecordsCreateQuery = String.format("select %s from %s limit 0", String.join((CharSequence)",", this.trackableFields), ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID()));
        String tableName = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        PostgresTableProperties postgresTableProperties = (PostgresTableProperties)((PostgresWarehouseConnector)ObjectRegistry.getInstance(PostgresWarehouseConnector.class)).getSnapshotTableProperties(this.warehousePollContext.getPrimaryKeys());
        postgresClient.createTableFromQuery(this.connection, tableName, failedRecordsCreateQuery, true, postgresTableProperties);
    }

    @Override
    public void cleanupResources(String pipelineUUID, Long pipelineRunId, WarehouseConfig warehouseConfig) {
        try {
            if (!this.connection.isClosed()) {
                this.connection.close();
            }
        }
        catch (Exception e) {
            log.error("Failed to close postgres connection for pipeline {}", (Object)pipelineUUID);
        }
    }

    @Override
    public void doFlush() throws Exception {
        if (this.bufferedRecords.size() > 0) {
            this.insertBufferedRecords(this.bufferedRecords);
        }
        if (this.failedRecords > 0L) {
            this.removeFailedRecordsFromSnapshot();
        }
        this.commitSnapshot();
        this.connection.close();
    }

    private void insertBufferedRecords(List<Tuple> bufferedRecords) throws SQLException {
        if (!this.failedRecordsTableCreated) {
            this.createFailedRecordsTable();
            this.failedRecordsTableCreated = true;
        }
        String failedRecordsTable = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        try (Statement statement = this.connection.createStatement();){
            for (Tuple record : bufferedRecords) {
                String insertRecordQuery = String.format("insert into %s(", failedRecordsTable) + record.getFields().stream().map(Field::getName).collect(Collectors.joining(",")) + ") values(" + record.getFields().stream().map(this::getQueryValue).collect(Collectors.joining(",")) + ")";
                statement.execute(insertRecordQuery);
            }
            statement.executeBatch();
        }
    }

    private String getQueryValue(Field field) {
        if (field.getValue() == null) {
            return "null";
        }
        if (field.getSchema().getType().equals((Object)SchemaType.STRING)) {
            return StringUtils.singleQuote((String)((String)field.getValue()));
        }
        return field.getValue().toString();
    }

    private void removeFailedRecordsFromSnapshot() throws SQLException {
        String uncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(this.warehousePollContext.getPipelineUUID());
        String failedRecordsTable = ConnectorExecutionConstants.getFailedRecordsTable(this.warehousePollContext.getPipelineUUID());
        StringBuilder failedRecordsDeleteQuery = new StringBuilder(String.format("delete from %s using %s where 1 = 1", uncommittedSnapshot, failedRecordsTable));
        for (String trackableField : this.trackableFields) {
            failedRecordsDeleteQuery.append(String.format(" AND (%s.%s = %s.%s)", failedRecordsTable, trackableField, uncommittedSnapshot, trackableField));
        }
        try (Statement statement = this.connection.createStatement();){
            statement.execute(failedRecordsDeleteQuery.toString());
        }
    }
}

