package org.apache.seatunnel.flink.clickhouse.sink;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.flink.types.Row;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.flink.clickhouse.ConfigKey;
import org.apache.seatunnel.flink.clickhouse.pojo.ClickhouseFileCopyMethod;
import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
import org.apache.seatunnel.flink.clickhouse.sink.client.ShardRouter;
import org.apache.seatunnel.flink.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.flink.clickhouse.sink.file.ScpFileTransfer;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;

/* loaded from: input_file:org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.class */
public class ClickhouseFileOutputFormat {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ClickhouseFileOutputFormat.class);
    private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/flink-file";
    private static final int UUID_LENGTH = 10;
    private final Config config;
    private final String clickhouseLocalPath;
    private final List<String> fields;
    private final ShardMetadata shardMetadata;
    private final ClickhouseFileCopyMethod clickhouseFileCopyMethod;
    private final Map<String, String> nodePassword;
    private final ClickhouseClient clickhouseClient;
    private final ShardRouter shardRouter;
    private final ClickhouseTable clickhouseTable;
    private final Map<String, String> schemaMap;
    private final Map<Shard, List<String>> shardLocalDataPaths;
    private final Map<Shard, List<Row>> rowCache;

    public ClickhouseFileOutputFormat(Config config, ShardMetadata shardMetadata, List<String> list) throws IOException {
        this.config = config;
        this.clickhouseLocalPath = config.getString(ConfigKey.CLICKHOUSE_LOCAL_PATH);
        this.shardMetadata = shardMetadata;
        this.fields = list;
        this.clickhouseFileCopyMethod = ClickhouseFileCopyMethod.from(config.getString(ConfigKey.COPY_METHOD));
        if (((Boolean) TypesafeConfigUtils.getConfig(config, ConfigKey.NODE_FREE_PASSWORD, true)).booleanValue()) {
            this.nodePassword = Collections.emptyMap();
        } else {
            this.nodePassword = (Map) config.getObjectList(ConfigKey.NODE_PASS).stream().collect(Collectors.toMap(configObject -> {
                return configObject.toConfig().getString(ConfigKey.NODE_ADDRESS);
            }, configObject2 -> {
                return configObject2.toConfig().getString("password");
            }));
        }
        this.clickhouseClient = new ClickhouseClient(config);
        this.shardRouter = new ShardRouter(this.clickhouseClient, shardMetadata);
        this.clickhouseTable = this.clickhouseClient.getClickhouseTable(config.getString(ConfigKey.DATABASE), config.getString(ConfigKey.TABLE));
        this.schemaMap = this.clickhouseClient.getClickhouseTableSchema(config.getString(ConfigKey.TABLE));
        this.rowCache = new HashMap(this.shardRouter.getShards().keySet().size());
        if (!((Boolean) TypesafeConfigUtils.getConfig(config, ConfigKey.NODE_FREE_PASSWORD, true)).booleanValue()) {
            this.shardRouter.getShards().values().forEach(shard -> {
                if (!this.nodePassword.containsKey(shard.getHostAddress()) && !this.nodePassword.containsKey(shard.getHostname())) {
                    throw new RuntimeException("Cannot find password of shard " + shard.getHostAddress());
                }
            });
        }
        this.shardLocalDataPaths = (Map) this.shardRouter.getShards().values().stream().collect(Collectors.toMap(Function.identity(), shard2 -> {
            return this.clickhouseClient.getClickhouseTable(shard2.getDatabase(), this.clickhouseTable.getLocalTableName()).getDataPaths();
        }));
    }

    public void writeRecords(Iterable<Row> iterable) {
        for (Row row : iterable) {
            this.rowCache.computeIfAbsent(this.shardRouter.getShard(row), shard -> {
                return new ArrayList();
            }).add(row);
        }
        for (Map.Entry<Shard, List<Row>> entry : this.rowCache.entrySet()) {
            Shard key = entry.getKey();
            List<Row> value = entry.getValue();
            flush(key, value);
            value.clear();
        }
    }

    private void flush(Shard shard, List<Row> list) {
        try {
            List<String> generateClickhouseLocalFiles = generateClickhouseLocalFiles(shard, list);
            attachClickhouseLocalFileToServer(shard, generateClickhouseLocalFiles);
            clearLocalFileDirectory(generateClickhouseLocalFiles);
        } catch (Exception e) {
            throw new RuntimeException("Flush data into clickhouse file error", e);
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r23v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r24v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 23, insn: 0x032f: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r23 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:82:0x032f */
    /* JADX WARN: Not initialized variable reg: 24, insn: 0x0334: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r24 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:84:0x0334 */
    /* JADX WARN: Type inference failed for: r23v0, types: [java.io.InputStreamReader] */
    /* JADX WARN: Type inference failed for: r24v0, types: [java.lang.Throwable] */
    private List<String> generateClickhouseLocalFiles(Shard shard, List<Row> list) throws IOException, InterruptedException {
        ?? r23;
        ?? r24;
        if (CollectionUtils.isEmpty(list)) {
            return Collections.emptyList();
        }
        String replaceAll = UUID.randomUUID().toString().substring(0, 10).replaceAll("-", "_");
        String format = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, replaceAll);
        FileUtils.forceMkdir(new File(format));
        String str = format + "/local_data.log";
        FileChannel open = FileChannel.open(Paths.get(str, new String[0]), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
        open.map(FileChannel.MapMode.READ_WRITE, open.size(), r0.getBytes(StandardCharsets.UTF_8).length).put(((String) list.stream().map(row -> {
            return (String) this.fields.stream().map(str2 -> {
                return row.getField(str2).toString();
            }).collect(Collectors.joining("\t"));
        }).collect(Collectors.joining("\n"))).getBytes(StandardCharsets.UTF_8));
        ArrayList arrayList = new ArrayList();
        arrayList.add("cat");
        arrayList.add(str);
        arrayList.add("|");
        arrayList.addAll((Collection) Arrays.stream(this.clickhouseLocalPath.trim().split(" ")).collect(Collectors.toList()));
        arrayList.add("local");
        arrayList.add("-S");
        arrayList.add("\"" + ((String) this.fields.stream().map(str2 -> {
            return str2 + " " + this.schemaMap.get(str2);
        }).collect(Collectors.joining(","))) + "\"");
        arrayList.add("-N");
        arrayList.add("\"temp_table" + replaceAll + "\"");
        arrayList.add("-q");
        arrayList.add(String.format("\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"", this.clickhouseTable.getCreateTableDDL().replace(this.clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""), this.clickhouseTable.getLocalTableName(), this.schemaMap.entrySet().stream().map(entry -> {
            return this.fields.contains(entry.getKey()) ? (String) entry.getKey() : "NULL";
        }).collect(Collectors.joining(",")), replaceAll));
        arrayList.add("--path");
        arrayList.add("\"" + format + "\"");
        LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", arrayList));
        Process start = new ProcessBuilder("bash", "-c", String.join(" ", arrayList)).start();
        InputStream inputStream = start.getInputStream();
        Throwable th = null;
        try {
            try {
                InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
                Throwable th2 = null;
                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                Throwable th3 = null;
                while (true) {
                    try {
                        try {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            }
                            LOGGER.info(readLine);
                        } finally {
                        }
                    } catch (Throwable th4) {
                        if (bufferedReader != null) {
                            if (th3 != null) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th5) {
                                    th3.addSuppressed(th5);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                        throw th4;
                    }
                }
                if (bufferedReader != null) {
                    if (0 != 0) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th6) {
                            th3.addSuppressed(th6);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                if (inputStreamReader != null) {
                    if (0 != 0) {
                        try {
                            inputStreamReader.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        inputStreamReader.close();
                    }
                }
                start.waitFor();
                File file = new File(format + "/data/_local/" + this.clickhouseTable.getLocalTableName());
                if (!file.exists()) {
                    throw new RuntimeException("clickhouse local file not exists");
                }
                File[] listFiles = file.listFiles();
                if (listFiles == null) {
                    throw new RuntimeException("clickhouse local file not exists");
                }
                return (List) Arrays.stream(listFiles).filter((v0) -> {
                    return v0.isDirectory();
                }).filter(file2 -> {
                    return !"detached".equals(file2.getName());
                }).map((v0) -> {
                    return v0.getAbsolutePath();
                }).collect(Collectors.toList());
            } catch (Throwable th8) {
                if (r23 != 0) {
                    if (r24 != 0) {
                        try {
                            r23.close();
                        } catch (Throwable th9) {
                            r24.addSuppressed(th9);
                        }
                    } else {
                        r23.close();
                    }
                }
                throw th8;
            }
        } finally {
            if (inputStream != null) {
                if (0 != 0) {
                    try {
                        inputStream.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    inputStream.close();
                }
            }
        }
    }

    private void attachClickhouseLocalFileToServer(Shard shard, List<String> list) {
        if (!ClickhouseFileCopyMethod.SCP.equals(this.clickhouseFileCopyMethod)) {
            throw new RuntimeException("unsupported clickhouse file copy method " + this.clickhouseFileCopyMethod);
        }
        String hostAddress = shard.getHostAddress();
        ScpFileTransfer scpFileTransfer = new ScpFileTransfer(hostAddress, this.nodePassword.getOrDefault(hostAddress, null));
        scpFileTransfer.init();
        scpFileTransfer.transferAndChown(list, this.shardLocalDataPaths.get(shard).get(0) + "detached/");
        scpFileTransfer.close();
        try {
            ClickHouseConnectionImpl clickhouseConnection = this.clickhouseClient.getClickhouseConnection(shard);
            Throwable th = null;
            try {
                try {
                    for (String str : list) {
                        clickhouseConnection.createStatement().execute(String.format("ALTER TABLE %s ATTACH PART '%s'", this.clickhouseTable.getLocalTableName(), str.substring(str.lastIndexOf("/") + 1)));
                    }
                    if (clickhouseConnection != null) {
                        if (0 != 0) {
                            try {
                                clickhouseConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            clickhouseConnection.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new RuntimeException("Unable to close connection", e);
        }
    }

    private void clearLocalFileDirectory(List<String> list) {
        String substring = list.get(0).substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + 10 + 1);
        try {
            if (new File(substring).exists()) {
                FileUtils.deleteDirectory(new File(substring));
            }
        } catch (IOException e) {
            throw new RuntimeException("Unable to delete directory " + substring, e);
        }
    }
}
