package org.apache.shardingsphere.data.pipeline.core.importer.sink;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.common.ingest.record.RecordUtils;
import org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineImportSQLBuilder;
import org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.class */
public final class PipelineDataSourceSink implements PipelineSink {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PipelineDataSourceSink.class);
    private static final DataRecordMerger MERGER = new DataRecordMerger();
    private final ImporterConfiguration importerConfig;
    private final PipelineDataSourceManager dataSourceManager;
    private final JobRateLimitAlgorithm rateLimitAlgorithm;
    private final PipelineImportSQLBuilder importSQLBuilder;
    private final AtomicReference<Statement> batchInsertStatement = new AtomicReference<>();
    private final AtomicReference<Statement> updateStatement = new AtomicReference<>();
    private final AtomicReference<Statement> batchDeleteStatement = new AtomicReference<>();

    public PipelineDataSourceSink(ImporterConfiguration importerConfiguration, PipelineDataSourceManager pipelineDataSourceManager) {
        this.importerConfig = importerConfiguration;
        this.dataSourceManager = pipelineDataSourceManager;
        this.rateLimitAlgorithm = importerConfiguration.getRateLimitAlgorithm();
        this.importSQLBuilder = new PipelineImportSQLBuilder(importerConfiguration.getDataSourceConfig().getDatabaseType());
    }

    @Override // org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink
    public boolean identifierMatched(Object obj) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink
    public PipelineJobProgressUpdatedParameter write(String str, List<Record> list) {
        return flush(this.dataSourceManager.getDataSource(this.importerConfig.getDataSourceConfig()), list);
    }

    private PipelineJobProgressUpdatedParameter flush(DataSource dataSource, List<Record> list) {
        Stream<Record> stream = list.stream();
        Class<DataRecord> cls = DataRecord.class;
        Objects.requireNonNull(DataRecord.class);
        Stream<Record> filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<DataRecord> cls2 = DataRecord.class;
        Objects.requireNonNull(DataRecord.class);
        List<DataRecord> list2 = (List) filter.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            return new PipelineJobProgressUpdatedParameter(0);
        }
        int i = 0;
        Iterator<DataRecord> it = list2.iterator();
        while (it.hasNext()) {
            if (IngestDataChangeType.INSERT.equals(it.next().getType())) {
                i++;
            }
        }
        for (GroupedDataRecord groupedDataRecord : MERGER.group(list2)) {
            flushInternal(dataSource, groupedDataRecord.getBatchDeleteDataRecords());
            flushInternal(dataSource, groupedDataRecord.getBatchInsertDataRecords());
            flushInternal(dataSource, groupedDataRecord.getBatchUpdateDataRecords());
            sequentialFlush(dataSource, groupedDataRecord.getNonBatchRecords());
        }
        return new PipelineJobProgressUpdatedParameter(i);
    }

    private void flushInternal(DataSource dataSource, List<DataRecord> list) {
        if (null == list || list.isEmpty()) {
            return;
        }
        tryFlush(dataSource, list);
    }

    private void tryFlush(DataSource dataSource, List<DataRecord> list) {
        for (int i = 0; !Thread.interrupted() && i <= this.importerConfig.getRetryTimes(); i++) {
            try {
                try {
                    doFlush(dataSource, list);
                    return;
                } catch (SQLException e) {
                    log.error("flush failed {}/{} times.", new Object[]{Integer.valueOf(i), Integer.valueOf(this.importerConfig.getRetryTimes()), e});
                    if (i == this.importerConfig.getRetryTimes()) {
                        throw new PipelineImporterJobWriteException(e);
                    }
                    Thread.sleep(Math.min(300000L, 1000 << i));
                }
            } catch (InterruptedException e2) {
                throw e2;
            }
        }
    }

    private void doFlush(DataSource dataSource, List<DataRecord> list) throws SQLException {
        Connection connection = dataSource.getConnection();
        try {
            connection.setAutoCommit(false);
            String type = list.get(0).getType();
            boolean z = -1;
            switch (type.hashCode()) {
                case -2130463047:
                    if (type.equals(IngestDataChangeType.INSERT)) {
                        z = false;
                        break;
                    }
                    break;
                case -1785516855:
                    if (type.equals(IngestDataChangeType.UPDATE)) {
                        z = true;
                        break;
                    }
                    break;
                case 2012838315:
                    if (type.equals(IngestDataChangeType.DELETE)) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (null != this.rateLimitAlgorithm) {
                        this.rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
                    }
                    executeBatchInsert(connection, list);
                    break;
                case true:
                    if (null != this.rateLimitAlgorithm) {
                        this.rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1);
                    }
                    executeUpdate(connection, list);
                    break;
                case true:
                    if (null != this.rateLimitAlgorithm) {
                        this.rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1);
                    }
                    executeBatchDelete(connection, list);
                    break;
            }
            connection.commit();
            if (connection != null) {
                connection.close();
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void doFlush(Connection connection, List<DataRecord> list) {
        for (DataRecord dataRecord : list) {
            try {
                doFlush(connection, dataRecord);
            } catch (SQLException e) {
                throw new PipelineImporterJobWriteException(String.format("Write failed, record=%s", dataRecord), e);
            }
        }
    }

    private void doFlush(Connection connection, DataRecord dataRecord) throws SQLException {
        String type = dataRecord.getType();
        boolean z = -1;
        switch (type.hashCode()) {
            case -2130463047:
                if (type.equals(IngestDataChangeType.INSERT)) {
                    z = false;
                    break;
                }
                break;
            case -1785516855:
                if (type.equals(IngestDataChangeType.UPDATE)) {
                    z = true;
                    break;
                }
                break;
            case 2012838315:
                if (type.equals(IngestDataChangeType.DELETE)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (null != this.rateLimitAlgorithm) {
                    this.rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
                }
                executeBatchInsert(connection, Collections.singletonList(dataRecord));
                return;
            case true:
                if (null != this.rateLimitAlgorithm) {
                    this.rateLimitAlgorithm.intercept(JobOperationType.UPDATE, 1);
                }
                executeUpdate(connection, dataRecord);
                return;
            case true:
                if (null != this.rateLimitAlgorithm) {
                    this.rateLimitAlgorithm.intercept(JobOperationType.DELETE, 1);
                }
                executeBatchDelete(connection, Collections.singletonList(dataRecord));
                return;
            default:
                return;
        }
    }

    private void executeBatchInsert(Connection connection, List<DataRecord> list) throws SQLException {
        DataRecord dataRecord = list.get(0);
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(this.importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), dataRecord));
            try {
                this.batchInsertStatement.set(prepareStatement);
                prepareStatement.setQueryTimeout(30);
                for (DataRecord dataRecord2 : list) {
                    for (int i = 0; i < dataRecord2.getColumnCount(); i++) {
                        prepareStatement.setObject(i + 1, dataRecord2.getColumn(i).getValue());
                    }
                    prepareStatement.addBatch();
                }
                prepareStatement.executeBatch();
                if (prepareStatement != null) {
                    prepareStatement.close();
                }
            } finally {
            }
        } finally {
            this.batchInsertStatement.set(null);
        }
    }

    private String getSchemaName(String str) {
        return getImporterConfig().getSchemaName(new LogicTableName(str));
    }

    private void executeUpdate(Connection connection, List<DataRecord> list) throws SQLException {
        Iterator<DataRecord> it = list.iterator();
        while (it.hasNext()) {
            executeUpdate(connection, it.next());
        }
    }

    private void executeUpdate(Connection connection, DataRecord dataRecord) throws SQLException {
        Set<String> shardingColumns = this.importerConfig.getShardingColumns(dataRecord.getTableName());
        List<Column> extractConditionColumns = RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
        List list = (List) dataRecord.getColumns().stream().filter((v0) -> {
            return v0.isUpdated();
        }).collect(Collectors.toList());
        String buildUpdateSQL = this.importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), dataRecord, extractConditionColumns);
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(buildUpdateSQL);
            try {
                this.updateStatement.set(prepareStatement);
                for (int i = 0; i < list.size(); i++) {
                    prepareStatement.setObject(i + 1, ((Column) list.get(i)).getValue());
                }
                for (int i2 = 0; i2 < extractConditionColumns.size(); i2++) {
                    Column column = extractConditionColumns.get(i2);
                    if (shardingColumns.contains(column.getName()) && null == column.getOldValue()) {
                        prepareStatement.setObject(list.size() + i2 + 1, column.getValue());
                    } else {
                        prepareStatement.setObject(list.size() + i2 + 1, column.getOldValue());
                    }
                }
                int executeUpdate = prepareStatement.executeUpdate();
                if (1 != executeUpdate) {
                    log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}, conditionColumns={}", new Object[]{Integer.valueOf(executeUpdate), buildUpdateSQL, list, extractConditionColumns});
                }
                if (prepareStatement != null) {
                    prepareStatement.close();
                }
            } finally {
            }
        } finally {
            this.updateStatement.set(null);
        }
    }

    private void executeBatchDelete(Connection connection, List<DataRecord> list) throws SQLException {
        DataRecord dataRecord = list.get(0);
        try {
            PreparedStatement prepareStatement = connection.prepareStatement(this.importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), dataRecord, RecordUtils.extractConditionColumns(dataRecord, this.importerConfig.getShardingColumns(dataRecord.getTableName()))));
            try {
                this.batchDeleteStatement.set(prepareStatement);
                prepareStatement.setQueryTimeout(30);
                for (DataRecord dataRecord2 : list) {
                    List<Column> extractConditionColumns = RecordUtils.extractConditionColumns(dataRecord2, this.importerConfig.getShardingColumns(dataRecord.getTableName()));
                    for (int i = 0; i < extractConditionColumns.size(); i++) {
                        Object oldValue = extractConditionColumns.get(i).getOldValue();
                        if (null == oldValue) {
                            log.warn("Record old value is null, record={}", dataRecord2);
                        }
                        prepareStatement.setObject(i + 1, oldValue);
                    }
                    prepareStatement.addBatch();
                }
                prepareStatement.executeBatch();
                if (prepareStatement != null) {
                    prepareStatement.close();
                }
            } finally {
            }
        } finally {
            this.batchDeleteStatement.set(null);
        }
    }

    private void sequentialFlush(DataSource dataSource, List<DataRecord> list) {
        if (list.isEmpty()) {
            return;
        }
        try {
            Connection connection = dataSource.getConnection();
            try {
                doFlush(connection, list);
                if (connection != null) {
                    connection.close();
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new PipelineImporterJobWriteException(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        PipelineJdbcUtils.cancelStatement(this.batchInsertStatement.get());
        PipelineJdbcUtils.cancelStatement(this.updateStatement.get());
        PipelineJdbcUtils.cancelStatement(this.batchDeleteStatement.get());
    }

    @Generated
    protected ImporterConfiguration getImporterConfig() {
        return this.importerConfig;
    }
}
