package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;

import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateTimeInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.MapInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.class */
public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
    private final SinkWriter.Context context;
    private final ReaderOption option;
    private final ShardRouter shardRouter;
    private final transient ClickhouseProxy proxy;
    private static final Logger log = LoggerFactory.getLogger(ClickhouseSinkWriter.class);
    private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION = new StringInjectFunction();
    private static final Pattern NULLABLE = Pattern.compile("Nullable\\((.*)\\)");
    private static final Pattern LOW_CARDINALITY = Pattern.compile("LowCardinality\\((.*)\\)");
    private final Map<String, ClickhouseFieldInjectFunction> fieldInjectFunctionMap = initFieldInjectFunctionMap();
    private final String prepareSql = initPrepareSQL();
    private final Map<Shard, ClickhouseBatchStatement> statementMap = initStatementMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClickhouseSinkWriter(ReaderOption readerOption, SinkWriter.Context context) {
        this.option = readerOption;
        this.context = context;
        this.proxy = new ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
        this.shardRouter = new ShardRouter(this.proxy, readerOption.getShardMetadata());
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        Object obj = null;
        if (StringUtils.isNotEmpty(this.option.getShardMetadata().getShardKey())) {
            obj = seaTunnelRow.getField(this.option.getSeaTunnelRowType().indexOf(this.option.getShardMetadata().getShardKey()));
        }
        ClickhouseBatchStatement clickhouseBatchStatement = this.statementMap.get(this.shardRouter.getShard(obj));
        PreparedStatement preparedStatement = clickhouseBatchStatement.getPreparedStatement();
        IntHolder intHolder = clickhouseBatchStatement.getIntHolder();
        addIntoBatch(seaTunnelRow, preparedStatement);
        intHolder.setValue(intHolder.getValue() + 1);
        if (intHolder.getValue() >= this.option.getBulkSize()) {
            flush(preparedStatement);
            intHolder.setValue(0);
        }
    }

    public Optional<CKCommitInfo> prepareCommit() throws IOException {
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        this.proxy.close();
        for (ClickhouseBatchStatement clickhouseBatchStatement : this.statementMap.values()) {
            try {
                ClickHouseConnectionImpl clickHouseConnection = clickhouseBatchStatement.getClickHouseConnection();
                Throwable th = null;
                try {
                    try {
                        PreparedStatement preparedStatement = clickhouseBatchStatement.getPreparedStatement();
                        Throwable th2 = null;
                        try {
                            try {
                                IntHolder intHolder = clickhouseBatchStatement.getIntHolder();
                                if (intHolder.getValue() > 0) {
                                    flush(preparedStatement);
                                    intHolder.setValue(0);
                                }
                                if (preparedStatement != null) {
                                    if (0 != 0) {
                                        try {
                                            preparedStatement.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        preparedStatement.close();
                                    }
                                }
                                if (clickHouseConnection != null) {
                                    if (0 != 0) {
                                        try {
                                            clickHouseConnection.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        clickHouseConnection.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new RuntimeException("Failed to close prepared statement.", e);
            }
        }
    }

    private void addIntoBatch(SeaTunnelRow seaTunnelRow, PreparedStatement preparedStatement) {
        for (int i = 0; i < this.option.getFields().size(); i++) {
            try {
                String str = this.option.getFields().get(i);
                Object field = seaTunnelRow.getField(this.option.getSeaTunnelRowType().indexOf(str));
                if (field == null) {
                    preparedStatement.setObject(i + 1, null);
                } else {
                    this.fieldInjectFunctionMap.getOrDefault(this.option.getTableSchema().get(str), DEFAULT_INJECT_FUNCTION).injectFields(preparedStatement, i + 1, field);
                }
            } catch (SQLException e) {
                throw new RuntimeException("Add row data into batch error", e);
            }
        }
        preparedStatement.addBatch();
    }

    private void flush(PreparedStatement preparedStatement) {
        try {
            preparedStatement.executeBatch();
        } catch (Exception e) {
            throw new RuntimeException("Clickhouse execute batch statement error", e);
        }
    }

    private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
        HashMap hashMap = new HashMap(16);
        this.shardRouter.getShards().forEach((num, shard) -> {
            try {
                ClickHouseConnectionImpl clickHouseConnectionImpl = new ClickHouseConnectionImpl(shard.getJdbcUrl(), this.option.getProperties());
                hashMap.put(shard, new ClickhouseBatchStatement(clickHouseConnectionImpl, clickHouseConnectionImpl.prepareStatement(this.prepareSql), new IntHolder()));
            } catch (SQLException e) {
                throw new RuntimeException("Clickhouse prepare statement error: " + e.getMessage(), e);
            }
        });
        return hashMap;
    }

    private String initPrepareSQL() {
        String[] strArr = new String[this.option.getFields().size()];
        Arrays.fill(strArr, "?");
        return String.format("INSERT INTO %s (%s) VALUES (%s)", this.shardRouter.getShardTable(), String.join(",", this.option.getFields()), String.join(",", strArr));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Map<String, ClickhouseFieldInjectFunction> initFieldInjectFunctionMap() {
        HashMap hashMap = new HashMap(16);
        ArrayList newArrayList = Lists.newArrayList(new ClickhouseFieldInjectFunction[]{new ArrayInjectFunction(), new MapInjectFunction(), new BigDecimalInjectFunction(), new DateInjectFunction(), new DateTimeInjectFunction(), new LongInjectFunction(), new DoubleInjectFunction(), new FloatInjectFunction(), new IntInjectFunction(), new StringInjectFunction()});
        StringInjectFunction stringInjectFunction = new StringInjectFunction();
        Iterator<String> it = this.option.getFields().iterator();
        while (it.hasNext()) {
            StringInjectFunction stringInjectFunction2 = stringInjectFunction;
            String str = this.option.getTableSchema().get(it.next());
            Iterator it2 = newArrayList.iterator();
            while (true) {
                if (it2.hasNext()) {
                    ClickhouseFieldInjectFunction clickhouseFieldInjectFunction = (ClickhouseFieldInjectFunction) it2.next();
                    if (clickhouseFieldInjectFunction.isCurrentFieldType(unwrapCommonPrefix(str))) {
                        stringInjectFunction2 = clickhouseFieldInjectFunction;
                        break;
                    }
                }
            }
            hashMap.put(str, stringInjectFunction2);
        }
        return hashMap;
    }

    private String unwrapCommonPrefix(String str) {
        Matcher matcher = NULLABLE.matcher(str);
        Matcher matcher2 = LOW_CARDINALITY.matcher(str);
        return matcher.matches() ? matcher.group(1) : matcher2.matches() ? matcher2.group(1) : str;
    }
}
