/*
 * Decompiled with CFR 0.152.
 */
package io.castled.warehouses.connectors.postgres;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.castled.ObjectRegistry;
import io.castled.commons.streams.JdbcRecordInputStream;
import io.castled.commons.streams.RecordInputStream;
import io.castled.constants.ConnectorExecutionConstants;
import io.castled.exceptions.CastledRuntimeException;
import io.castled.schema.models.RecordSchema;
import io.castled.warehouses.WarehouseConfig;
import io.castled.warehouses.WarehouseDataPoller;
import io.castled.warehouses.connectors.postgres.PostgresClient;
import io.castled.warehouses.connectors.postgres.PostgresResultSetSchemaMapper;
import io.castled.warehouses.connectors.postgres.PostgresTableProperties;
import io.castled.warehouses.connectors.postgres.PostgresWarehouseConfig;
import io.castled.warehouses.connectors.postgres.PostgresWarehouseConnector;
import io.castled.warehouses.models.WarehousePollContext;
import io.castled.warehouses.models.WarehousePollResult;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class PostgresDataPoller
implements WarehouseDataPoller {
    private static final Logger log = LoggerFactory.getLogger(PostgresDataPoller.class);
    private final PostgresClient postgresClient;
    private final PostgresResultSetSchemaMapper resultSetSchemaMapper;
    private final PostgresWarehouseConnector postgresWarehouseConnector;

    @Inject
    public PostgresDataPoller(PostgresClient postgresClient, PostgresResultSetSchemaMapper resultSetSchemaMapper, PostgresWarehouseConnector postgresWarehouseConnector) {
        this.postgresClient = postgresClient;
        this.resultSetSchemaMapper = resultSetSchemaMapper;
        this.postgresWarehouseConnector = postgresWarehouseConnector;
    }

    @Override
    public WarehousePollResult pollRecords(WarehousePollContext warehousePollContext) {
        WarehousePollResult warehousePollResult;
        block8: {
            PostgresWarehouseConfig postgresWarehouseConfig = (PostgresWarehouseConfig)warehousePollContext.getWarehouseConfig();
            Connection connection = this.postgresWarehouseConnector.getConnection(postgresWarehouseConfig);
            try {
                List<String> bookKeepingTables = this.postgresClient.listTables(connection, "castled");
                this.createUncommittedSnapshot(connection, warehousePollContext, bookKeepingTables);
                RecordSchema querySchema = this.getSchemaFromQuery(connection, String.format("select * from %s", ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID())));
                warehousePollResult = WarehousePollResult.builder().recordInputStream(this.createRecordStream(connection, warehousePollContext, bookKeepingTables, querySchema)).warehouseSchema(querySchema).build();
                if (connection == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    log.error("Poll records from warehouse {} failed", (Object)warehousePollContext.getWarehouseConfig().getType(), (Object)e);
                    throw new CastledRuntimeException((Throwable)e);
                }
            }
            connection.close();
        }
        return warehousePollResult;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public WarehousePollResult resumePoll(WarehousePollContext warehousePollContext) {
        try (Connection connection = this.postgresWarehouseConnector.getConnection((PostgresWarehouseConfig)warehousePollContext.getWarehouseConfig());){
            List<String> bookKeepingTables = this.postgresClient.listTables(connection, "castled");
            if (!bookKeepingTables.contains(ConnectorExecutionConstants.getUncommittedSnapshot(warehousePollContext.getPipelineUUID()))) {
                WarehousePollResult warehousePollResult2 = this.pollRecords(warehousePollContext);
                return warehousePollResult2;
            }
            RecordSchema querySchema = this.getSchemaFromQuery(connection, String.format("select * from %s", ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID())));
            WarehousePollResult warehousePollResult = WarehousePollResult.builder().recordInputStream(this.createRecordStream(connection, warehousePollContext, bookKeepingTables, querySchema)).warehouseSchema(querySchema).resumed(true).build();
            return warehousePollResult;
        }
        catch (Exception e) {
            log.error("Resume poll from warehouse {} failed", (Object)warehousePollContext.getWarehouseConfig().getType(), (Object)e);
            return this.pollRecords(warehousePollContext);
        }
    }

    private RecordInputStream createRecordStream(Connection connection, WarehousePollContext warehousePollContext, List<String> bookKeepingTables, RecordSchema querySchema) {
        return new JdbcRecordInputStream(connection, this.getDataFetchQuery(warehousePollContext, bookKeepingTables), querySchema);
    }

    private String getDataFetchQuery(WarehousePollContext warehousePollRequest, List<String> bookKeepingTables) {
        String committedSnapshot = ConnectorExecutionConstants.getQualifiedCommittedSnapshot(warehousePollRequest.getPipelineUUID());
        String uncommittedSnapshot = ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollRequest.getPipelineUUID());
        if (bookKeepingTables.contains(ConnectorExecutionConstants.getCommittedSnapshot(warehousePollRequest.getPipelineUUID()))) {
            return String.format("select * from %s except select * from %s", uncommittedSnapshot, committedSnapshot);
        }
        return String.format("select * from %s", uncommittedSnapshot);
    }

    private void createUncommittedSnapshot(Connection connection, WarehousePollContext warehousePollContext, List<String> internalTables) throws SQLException {
        if (internalTables.contains(ConnectorExecutionConstants.getUncommittedSnapshot(warehousePollContext.getPipelineUUID()))) {
            try (Statement statement = connection.createStatement();){
                statement.execute(String.format("drop table if exists %s", ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID())));
            }
        }
        this.postgresClient.createTableFromQuery(connection, ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(warehousePollContext.getPipelineUUID()), warehousePollContext.getQuery(), false, (PostgresTableProperties)((PostgresWarehouseConnector)ObjectRegistry.getInstance(PostgresWarehouseConnector.class)).getSnapshotTableProperties(warehousePollContext.getPrimaryKeys()));
    }

    private RecordSchema getSchemaFromQuery(Connection connection, String query) throws SQLException {
        try (PreparedStatement preparedStatement = connection.prepareStatement(query);){
            RecordSchema recordSchema = this.resultSetSchemaMapper.getSchema(preparedStatement.getMetaData());
            return recordSchema;
        }
    }

    @Override
    public void cleanupPipelineRunResources(WarehousePollContext warehousePollContext) {
    }

    @Override
    public void cleanupPipelineResources(String pipelineUUID, WarehouseConfig warehouseConfig) {
        try {
            PostgresWarehouseConnector postgresWarehouseConnector = (PostgresWarehouseConnector)ObjectRegistry.getInstance(PostgresWarehouseConnector.class);
            try (Connection connection = postgresWarehouseConnector.getConnection((PostgresWarehouseConfig)warehouseConfig);
                 Statement statement = connection.createStatement();){
                statement.execute(String.format("drop table if exists %s", ConnectorExecutionConstants.getQualifiedCommittedSnapshot(pipelineUUID)));
                statement.execute(String.format("drop table if exists %s", ConnectorExecutionConstants.getQualifiedUncommittedSnapshot(pipelineUUID)));
            }
        }
        catch (SQLException e) {
            log.error("Cleanup pipeline resources failed for pipeline {}", (Object)pipelineUUID);
            throw new CastledRuntimeException((Throwable)e);
        }
    }
}

