package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;

import com.clickhouse.client.ClickHouseNode;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

@AutoService({SeaTunnelSink.class})
/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.class */
public class ClickhouseSink implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
    private ReaderOption option;

    public String getPluginName() {
        return "Clickhouse";
    }

    public void prepare(Config config) throws PrepareFailException {
        CheckResult checkAllExists = CheckConfigUtil.checkAllExists(config, new String[]{ClickhouseConfig.HOST.key(), ClickhouseConfig.DATABASE.key(), ClickhouseConfig.TABLE.key()});
        boolean z = config.hasPath(ClickhouseConfig.USERNAME.key()) || config.hasPath(ClickhouseConfig.PASSWORD.key());
        if (z) {
            checkAllExists = CheckConfigUtil.checkAllExists(config, new String[]{ClickhouseConfig.USERNAME.key(), ClickhouseConfig.PASSWORD.key()});
        }
        if (!checkAllExists.isSuccess()) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode) SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SINK, checkAllExists.getMsg()));
        }
        Config withFallback = config.withFallback(ConfigFactory.parseMap(ImmutableMap.builder().put(ClickhouseConfig.BULK_SIZE.key(), ClickhouseConfig.BULK_SIZE.defaultValue()).put(ClickhouseConfig.SPLIT_MODE.key(), ClickhouseConfig.SPLIT_MODE.defaultValue()).build()));
        List<ClickHouseNode> createNodes = !z ? ClickhouseUtil.createNodes(withFallback.getString(ClickhouseConfig.HOST.key()), withFallback.getString(ClickhouseConfig.DATABASE.key()), null, null) : ClickhouseUtil.createNodes(withFallback.getString(ClickhouseConfig.HOST.key()), withFallback.getString(ClickhouseConfig.DATABASE.key()), withFallback.getString(ClickhouseConfig.USERNAME.key()), withFallback.getString(ClickhouseConfig.PASSWORD.key()));
        Properties properties = new Properties();
        if (CheckConfigUtil.isValidParam(withFallback, ClickhouseConfig.CLICKHOUSE_CONFIG.key())) {
            withFallback.getObject(ClickhouseConfig.CLICKHOUSE_CONFIG.key()).forEach((str, configValue) -> {
                properties.put(str, String.valueOf(configValue.unwrapped()));
            });
        }
        if (z) {
            properties.put("user", withFallback.getString(ClickhouseConfig.USERNAME.key()));
            properties.put("password", withFallback.getString(ClickhouseConfig.PASSWORD.key()));
        }
        ClickhouseProxy clickhouseProxy = new ClickhouseProxy(createNodes.get(0));
        Map<String, String> clickhouseTableSchema = clickhouseProxy.getClickhouseTableSchema(withFallback.getString(ClickhouseConfig.TABLE.key()));
        String str2 = null;
        String str3 = null;
        ClickhouseTable clickhouseTable = clickhouseProxy.getClickhouseTable(withFallback.getString(ClickhouseConfig.DATABASE.key()), withFallback.getString(ClickhouseConfig.TABLE.key()));
        if (withFallback.getBoolean(ClickhouseConfig.SPLIT_MODE.key())) {
            if (!"Distributed".equals(clickhouseTable.getEngine())) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode) CommonErrorCode.ILLEGAL_ARGUMENT, "split mode only support table which engine is 'Distributed' engine at now");
            }
            if (withFallback.hasPath(ClickhouseConfig.SHARDING_KEY.key())) {
                str2 = withFallback.getString(ClickhouseConfig.SHARDING_KEY.key());
                str3 = clickhouseTableSchema.get(str2);
            }
        }
        ShardMetadata shardMetadata = z ? new ShardMetadata(str2, str3, clickhouseTable.getSortingKey(), withFallback.getString(ClickhouseConfig.DATABASE.key()), withFallback.getString(ClickhouseConfig.TABLE.key()), clickhouseTable.getEngine(), withFallback.getBoolean(ClickhouseConfig.SPLIT_MODE.key()), new Shard(1, 1, createNodes.get(0)), withFallback.getString(ClickhouseConfig.USERNAME.key()), withFallback.getString(ClickhouseConfig.PASSWORD.key())) : new ShardMetadata(str2, str3, clickhouseTable.getSortingKey(), withFallback.getString(ClickhouseConfig.DATABASE.key()), withFallback.getString(ClickhouseConfig.TABLE.key()), clickhouseTable.getEngine(), withFallback.getBoolean(ClickhouseConfig.SPLIT_MODE.key()), new Shard(1, 1, createNodes.get(0)));
        clickhouseProxy.close();
        String[] strArr = null;
        if (withFallback.hasPath(ClickhouseConfig.PRIMARY_KEY.key())) {
            String string = withFallback.getString(ClickhouseConfig.PRIMARY_KEY.key());
            if (str2 != null && !Objects.equals(string, str2)) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode) CommonErrorCode.ILLEGAL_ARGUMENT, "sharding_key and primary_key must be consistent to ensure correct processing of cdc events");
            }
            strArr = new String[]{string};
        }
        boolean booleanValue = ((Boolean) ClickhouseConfig.SUPPORT_UPSERT.defaultValue()).booleanValue();
        if (withFallback.hasPath(ClickhouseConfig.SUPPORT_UPSERT.key())) {
            booleanValue = withFallback.getBoolean(ClickhouseConfig.SUPPORT_UPSERT.key());
        }
        boolean booleanValue2 = ((Boolean) ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.defaultValue()).booleanValue();
        if (withFallback.hasPath(ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key())) {
            booleanValue2 = withFallback.getBoolean(ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key());
        }
        this.option = ReaderOption.builder().shardMetadata(shardMetadata).properties(properties).tableEngine(clickhouseTable.getEngine()).tableSchema(clickhouseTableSchema).bulkSize(withFallback.getInt(ClickhouseConfig.BULK_SIZE.key())).primaryKeys(strArr).supportUpsert(booleanValue).allowExperimentalLightweightDelete(booleanValue2).build();
    }

    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> createWriter(SinkWriter.Context context) throws IOException {
        return new ClickhouseSinkWriter(this.option, context);
    }

    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> restoreWriter(SinkWriter.Context context, List<ClickhouseSinkState> list) throws IOException {
        return super.restoreWriter(context, list);
    }

    public Optional<Serializer<ClickhouseSinkState>> getWriterStateSerializer() {
        return Optional.of(new DefaultSerializer());
    }

    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
        this.option.setSeaTunnelRowType(seaTunnelRowType);
    }

    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
        return this.option.getSeaTunnelRowType();
    }
}
