package org.apache.seatunnel.flink.clickhouse.sink;

import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.flink.BaseFlinkSink;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSink;
import org.apache.seatunnel.flink.clickhouse.ConfigKey;
import org.apache.seatunnel.flink.clickhouse.pojo.ClickhouseFileCopyMethod;
import org.apache.seatunnel.flink.clickhouse.pojo.IntHolder;
import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.sshd.client.auth.keyboard.UserInteraction;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;

@AutoService({BaseFlinkSink.class})
/* loaded from: input_file:org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileBatchSink.class */
public class ClickhouseFileBatchSink implements FlinkBatchSink {
    private Config config;
    private ShardMetadata shardMetadata;
    private Map<String, String> tableSchema = new HashMap();
    private List<String> fields;

    public void setConfig(Config config) {
        this.config = config;
    }

    public Config getConfig() {
        return this.config;
    }

    public CheckResult checkConfig() {
        CheckResult checkAllExists = CheckConfigUtil.checkAllExists(this.config, new String[]{ConfigKey.HOST, ConfigKey.TABLE, ConfigKey.DATABASE, ConfigKey.USERNAME, "password", ConfigKey.CLICKHOUSE_LOCAL_PATH});
        if (!checkAllExists.isSuccess()) {
            return checkAllExists;
        }
        this.config = this.config.withFallback(ConfigFactory.parseMap(ImmutableMap.builder().put(ConfigKey.COPY_METHOD, ClickhouseFileCopyMethod.SCP.getName()).build()));
        return CheckResult.success();
    }

    public void prepare(FlinkEnvironment flinkEnvironment) {
        ClickhouseClient clickhouseClient = new ClickhouseClient(this.config);
        String string = this.config.getString(ConfigKey.TABLE);
        String string2 = this.config.getString(ConfigKey.DATABASE);
        String[] split = this.config.getString(ConfigKey.HOST).split(UserInteraction.DEFAULT_CHECK_INTERACTIVE_PASSWORD_DELIM);
        try {
            ClickHouseConnectionImpl clickhouseConnection = clickhouseClient.getClickhouseConnection();
            Throwable th = null;
            try {
                try {
                    this.tableSchema = clickhouseClient.getClickhouseTableSchema(clickhouseConnection, string);
                    String str = (String) TypesafeConfigUtils.getConfig(this.config, ConfigKey.SHARDING_KEY, "");
                    this.shardMetadata = new ShardMetadata(str, this.tableSchema.get(str), string2, string, false, new Shard(1, 1, 1, split[0], split[0], split[1], string2));
                    if (this.config.hasPath(ConfigKey.FIELDS)) {
                        this.fields = this.config.getStringList(ConfigKey.FIELDS);
                        for (String str2 : this.fields) {
                            if (!this.tableSchema.containsKey(str2)) {
                                throw new RuntimeException("Field " + str2 + " does not exist in table " + string);
                            }
                        }
                    }
                    if (clickhouseConnection != null) {
                        if (0 != 0) {
                            try {
                                clickhouseConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            clickhouseConnection.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new RuntimeException("Failed to connect to clickhouse server", e);
        }
    }

    @Nullable
    public void outputBatch(FlinkEnvironment flinkEnvironment, DataSet<Row> dataSet) {
        String[] fieldNames = dataSet.getType().getFieldNames();
        final IntHolder intHolder = new IntHolder();
        int i = 0;
        while (true) {
            if (i >= fieldNames.length) {
                break;
            }
            if (fieldNames[i].equals(this.shardMetadata.getShardKey())) {
                intHolder.setValue(i);
                break;
            }
            i++;
        }
        dataSet.partitionCustom(new Partitioner<String>() { // from class: org.apache.seatunnel.flink.clickhouse.sink.ClickhouseFileBatchSink.2
            public int partition(String str, int i2) {
                return str.hashCode() % i2;
            }
        }, new KeySelector<Row, String>() { // from class: org.apache.seatunnel.flink.clickhouse.sink.ClickhouseFileBatchSink.3
            public String getKey(Row row) {
                return Objects.toString(row.getField(intHolder.getValue()));
            }
        }).mapPartition(new MapPartitionFunction<Row, Row>() { // from class: org.apache.seatunnel.flink.clickhouse.sink.ClickhouseFileBatchSink.1
            public void mapPartition(Iterable<Row> iterable, Collector<Row> collector) throws Exception {
                new ClickhouseFileOutputFormat(ClickhouseFileBatchSink.this.config, ClickhouseFileBatchSink.this.shardMetadata, ClickhouseFileBatchSink.this.fields).writeRecords(iterable);
            }
        }).output(new OutputFormat<Row>() { // from class: org.apache.seatunnel.flink.clickhouse.sink.ClickhouseFileBatchSink.4
            public void configure(Configuration configuration) {
            }

            public void open(int i2, int i3) {
            }

            public void writeRecord(Row row) {
            }

            public void close() {
            }
        });
    }

    public void close() {
    }

    public String getPluginName() {
        return "ClickhouseFile";
    }
}
