package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;

import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseValues;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import org.apache.sshd.common.util.OsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.class */
public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class);
    private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file";
    private static final int UUID_LENGTH = 10;
    private final FileReaderOption readerOption;
    private final ShardRouter shardRouter;
    private final ClickhouseProxy proxy;
    private final ClickhouseTable clickhouseTable;
    private final Map<Shard, List<String>> shardLocalDataPaths;
    private final Map<Shard, List<SeaTunnelRow>> rowCache = new HashMap(16);

    public ClickhouseFileSinkWriter(FileReaderOption fileReaderOption, SinkWriter.Context context) {
        this.readerOption = fileReaderOption;
        this.proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
        this.shardRouter = new ShardRouter(this.proxy, this.readerOption.getShardMetadata());
        this.clickhouseTable = this.proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(), this.readerOption.getShardMetadata().getTable());
        nodePasswordCheck();
        this.shardLocalDataPaths = (Map) this.shardRouter.getShards().values().stream().collect(Collectors.toMap(Function.identity(), shard -> {
            return this.proxy.getClickhouseTable(shard.getNode().getDatabase().get(), this.clickhouseTable.getLocalTableName()).getDataPaths();
        }));
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        this.rowCache.computeIfAbsent(this.shardRouter.getShard(seaTunnelRow), shard -> {
            return new ArrayList();
        }).add(seaTunnelRow);
    }

    private void nodePasswordCheck() {
        if (this.readerOption.isNodeFreePass()) {
            return;
        }
        this.shardRouter.getShards().values().forEach(shard -> {
            if (!this.readerOption.getNodePassword().containsKey(shard.getNode().getAddress().getHostName()) && !this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
                throw new RuntimeException("Cannot find password of shard " + shard.getNode().getAddress().getHostName());
            }
        });
    }

    public Optional<CKCommitInfo> prepareCommit() throws IOException {
        return Optional.empty();
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        this.rowCache.forEach(this::flush);
    }

    private void flush(Shard shard, List<SeaTunnelRow> list) {
        try {
            List<String> generateClickhouseLocalFiles = generateClickhouseLocalFiles(list);
            attachClickhouseLocalFileToServer(shard, generateClickhouseLocalFiles);
            clearLocalFileDirectory(generateClickhouseLocalFiles);
        } catch (Exception e) {
            throw new RuntimeException("Flush data into clickhouse file error", e);
        }
    }

    private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> list) throws IOException, InterruptedException {
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        String replaceAll = UUID.randomUUID().toString().substring(0, 10).replaceAll("-", "_");
        String format = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, replaceAll);
        FileUtils.forceMkdir(new File(format));
        String str = format + "/local_data.log";
        FileChannel open = FileChannel.open(Paths.get(str, new String[0]), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
        try {
            open.map(FileChannel.MapMode.READ_WRITE, open.size(), r0.getBytes(StandardCharsets.UTF_8).length).put(((String) list.stream().map(seaTunnelRow -> {
                return (String) this.readerOption.getFields().stream().map(str2 -> {
                    return seaTunnelRow.getField(this.readerOption.getSeaTunnelRowType().indexOf(str2)).toString();
                }).collect(Collectors.joining("\t"));
            }).collect(Collectors.joining(StringUtils.LF))).getBytes(StandardCharsets.UTF_8));
            if (open != null) {
                open.close();
            }
            List list2 = (List) Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(StringUtils.SPACE)).collect(Collectors.toList());
            ArrayList arrayList = new ArrayList(list2);
            if (list2.size() == 1) {
                arrayList.add("local");
            }
            arrayList.add("--file");
            arrayList.add(str);
            arrayList.add("-S");
            arrayList.add("\"" + ((String) this.readerOption.getFields().stream().map(str2 -> {
                return str2 + StringUtils.SPACE + this.readerOption.getTableSchema().get(str2);
            }).collect(Collectors.joining(","))) + "\"");
            arrayList.add("-N");
            arrayList.add("\"temp_table" + replaceAll + "\"");
            arrayList.add("-q");
            arrayList.add(String.format("\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"", this.clickhouseTable.getCreateTableDDL().replace(this.clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""), this.clickhouseTable.getLocalTableName(), this.readerOption.getTableSchema().keySet().stream().map(str3 -> {
                return this.readerOption.getFields().contains(str3) ? str3 : ClickHouseValues.NULL_EXPR;
            }).collect(Collectors.joining(",")), replaceAll));
            arrayList.add("--path");
            arrayList.add("\"" + format + "\"");
            LOGGER.info("Generate clickhouse local file command: {}", String.join(StringUtils.SPACE, arrayList));
            Process start = new ProcessBuilder("bash", "-c", String.join(StringUtils.SPACE, arrayList)).start();
            InputStream inputStream = start.getInputStream();
            try {
                InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
                try {
                    BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                    while (true) {
                        try {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            }
                            LOGGER.info(readLine);
                        } catch (Throwable th) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                        }
                    }
                    bufferedReader.close();
                    inputStreamReader.close();
                    if (inputStream != null) {
                        inputStream.close();
                    }
                    start.waitFor();
                    File file = new File(format + "/data/_local/" + this.clickhouseTable.getLocalTableName());
                    if (!file.exists()) {
                        throw new RuntimeException("clickhouse local file not exists");
                    }
                    File[] listFiles = file.listFiles();
                    if (listFiles == null) {
                        throw new RuntimeException("clickhouse local file not exists");
                    }
                    return (List) Arrays.stream(listFiles).filter((v0) -> {
                        return v0.isDirectory();
                    }).filter(file2 -> {
                        return !"detached".equals(file2.getName());
                    }).map((v0) -> {
                        return v0.getAbsolutePath();
                    }).collect(Collectors.toList());
                } finally {
                }
            } catch (Throwable th3) {
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (open != null) {
                try {
                    open.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    /* JADX WARN: Type inference failed for: r0v32, types: [com.clickhouse.client.ClickHouseRequest] */
    private void attachClickhouseLocalFileToServer(Shard shard, List<String> list) throws ClickHouseException {
        String hostName = shard.getNode().getAddress().getHostName();
        FileTransfer createFileTransfer = FileTransferFactory.createFileTransfer(this.readerOption.getCopyMethod(), hostName, this.readerOption.getNodeUser().getOrDefault(hostName, OsUtils.ROOT_USER), this.readerOption.getNodePassword().getOrDefault(hostName, null));
        createFileTransfer.init();
        createFileTransfer.transferAndChown(list, this.shardLocalDataPaths.get(shard).get(0) + "detached/");
        createFileTransfer.close();
        ClickHouseRequest<?> clickhouseConnection = this.proxy.getClickhouseConnection(shard);
        for (String str : list) {
            clickhouseConnection.query(String.format("ALTER TABLE %s ATTACH PART '%s'", this.clickhouseTable.getLocalTableName(), str.substring(str.lastIndexOf("/") + 1))).executeAndWait().close();
        }
    }

    private void clearLocalFileDirectory(List<String> list) {
        String substring = list.get(0).substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + 10 + 1);
        try {
            if (new File(substring).exists()) {
                FileUtils.deleteDirectory(new File(substring));
            }
        } catch (IOException e) {
            throw new RuntimeException("Unable to delete directory " + substring, e);
        }
    }
}
