package io.kyligence.kap.clickhouse.job;

import io.kyligence.kap.secondstorage.NameUtil;
import io.kyligence.kap.secondstorage.SecondStorage;
import io.kyligence.kap.secondstorage.SecondStorageConcurrentTestUtil;
import io.kyligence.kap.secondstorage.SecondStorageLockUtils;
import io.kyligence.kap.secondstorage.SecondStorageNodeHelper;
import io.kyligence.kap.secondstorage.SecondStorageUtil;
import io.kyligence.kap.secondstorage.enums.LockTypeEnum;
import io.kyligence.kap.secondstorage.metadata.NodeGroup;
import io.kyligence.kap.secondstorage.metadata.PartitionType;
import io.kyligence.kap.secondstorage.metadata.TableEntity;
import io.kyligence.kap.secondstorage.metadata.TableFlow;
import io.kyligence.kap.secondstorage.metadata.TablePlan;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.SegmentOnlineMode;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.JobStoppedException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.JobSchedulerModeEnum;
import org.apache.kylin.metadata.cube.model.IndexPlan;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.spark.sql.execution.datasources.jdbc.ShardOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kyligence/kap/clickhouse/job/ClickHouseLoad.class */
public class ClickHouseLoad extends AbstractExecutable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ClickHouseLoad.class);
    public static final String SOURCE_URL = "source_url";
    public static final String ROOT_PATH = "root_path";
    private List<List<LoadInfo>> loadInfos;
    private LoadContext loadContext;

    /* loaded from: input_file:io/kyligence/kap/clickhouse/job/ClickHouseLoad$MethodContext.class */
    public static class MethodContext {
        private final String project;
        private final String dataflowId;
        private final NDataflow df;
        private final String database;
        private final Set<Long> layoutIds;
        private final Set<String> segmentIds;
        private final boolean isIncremental;
        private final KylinConfig config = KylinConfig.getInstanceFromEnv();
        private final Function<LayoutEntity, String> prefixTableName = layoutEntity -> {
            return NameUtil.getTable(this.df, layoutEntity.getId());
        };

        MethodContext(ClickHouseLoad clickHouseLoad) {
            this.project = clickHouseLoad.getProject();
            this.dataflowId = clickHouseLoad.getParam("dataflowId");
            this.df = NDataflowManager.getInstance(this.config, clickHouseLoad.getProject()).getDataflow(this.dataflowId);
            this.database = NameUtil.getDatabase(this.df);
            this.layoutIds = clickHouseLoad.getLayoutIds();
            this.segmentIds = clickHouseLoad.getSegmentIds();
            this.isIncremental = this.df.getSegments().stream().filter(nDataSegment -> {
                return this.segmentIds.contains(nDataSegment.getId());
            }).noneMatch(nDataSegment2 -> {
                return nDataSegment2.getSegRange().isInfinite();
            });
        }

        IndexPlan indexPlan() {
            return this.df.getIndexPlan();
        }

        TablePlan tablePlan() {
            return (TablePlan) SecondStorage.tablePlanManager(this.config, this.project).get(this.dataflowId).orElseThrow(() -> {
                return new IllegalStateException(" no table plan found");
            });
        }

        TableFlow tableFlow() {
            return (TableFlow) SecondStorage.tableFlowManager(this.config, this.project).get(this.dataflowId).orElseThrow(() -> {
                return new IllegalStateException(" no table flow found");
            });
        }

        public String getProject() {
            return this.project;
        }

        public String getDataflowId() {
            return this.dataflowId;
        }

        public NDataflow getDf() {
            return this.df;
        }

        public String getDatabase() {
            return this.database;
        }

        public Function<LayoutEntity, String> getPrefixTableName() {
            return this.prefixTableName;
        }
    }

    private static String indexPath(String str, String str2, String str3, long j) {
        return String.format(Locale.ROOT, "%s%s/%s/%d", str, str2, str3, Long.valueOf(j));
    }

    public List<List<LoadInfo>> getLoadInfos() {
        return this.loadInfos;
    }

    public ClickHouseLoad() {
        this.loadInfos = null;
        setName("STEP_EXPORT_TO_SECOND_STORAGE");
    }

    public ClickHouseLoad(Object obj) {
        super(obj);
        this.loadInfos = null;
    }

    public ClickHouseLoad(String str) {
        this.loadInfos = null;
        setName(str);
    }

    private String getUtParam(String str) {
        return System.getProperty(str, null);
    }

    private Engine createTableEngine() {
        return new Engine(getTableSourceType(), getUtParam(ROOT_PATH), getUtParam(SOURCE_URL));
    }

    public boolean isAzurePlatform() {
        String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
        return hdfsWorkingDirectory.startsWith("wasb") || hdfsWorkingDirectory.startsWith("abfs");
    }

    private boolean isAwsPlatform() {
        return KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory().startsWith("s3");
    }

    private boolean isUt() {
        return KylinConfig.getInstanceFromEnv().isUTEnv();
    }

    private AbstractTableSource getTableSourceType() {
        return isAwsPlatform() ? new S3TableSource() : isAzurePlatform() ? new BlobTableSource() : isUt() ? new UtTableSource() : new HdfsTableSource();
    }

    protected String[] selectInstances(String[] strArr, int i) {
        if (strArr.length == i) {
            return strArr;
        }
        throw new UnsupportedOperationException();
    }

    protected FileProvider getFileProvider(NDataflow nDataflow, String str, long j) {
        String path;
        if (JobSchedulerModeEnum.CHAIN.toString().equalsIgnoreCase(nDataflow.getConfig().getJobSchedulerMode())) {
            path = indexPath(KapConfig.wrap(nDataflow.getConfig()).getReadParquetStoragePath(nDataflow.getProject()), nDataflow.getId(), str, j);
            logger.info("Tiered storage load segment {} layout {} from index path {}", new Object[]{str, Long.valueOf(j), path});
        } else {
            path = getConfig().getFlatTableDir(getProject(), nDataflow.getId(), str).toString();
            logger.info("Tiered storage load segment {} layout {} from flat table path {}", new Object[]{str, Long.valueOf(j), path});
        }
        return new SegmentFileProvider(path);
    }

    private List<LoadInfo> distributeLoad(MethodContext methodContext, String[] strArr) {
        Stream stream = methodContext.layoutIds.stream();
        IndexPlan indexPlan = methodContext.indexPlan();
        indexPlan.getClass();
        return (List) stream.map(indexPlan::getLayoutEntity).filter(SecondStorageUtil::isBaseTableIndex).flatMap(layoutEntity -> {
            return methodContext.segmentIds.stream().filter(str -> {
                return filterLayoutBySegmentId(methodContext, str, layoutEntity.getId());
            }).map(str2 -> {
                return getLoadInfo(str2, layoutEntity, methodContext, strArr);
            });
        }).collect(Collectors.toList());
    }

    private boolean filterLayoutBySegmentId(MethodContext methodContext, String str, long j) {
        return isDAGJobScheduler() || methodContext.df.getSegment(str).getLayoutsMap().containsKey(Long.valueOf(j));
    }

    private LoadInfo getLoadInfo(String str, LayoutEntity layoutEntity, MethodContext methodContext, String[] strArr) {
        LoadInfo genLoadInfoBySegmentId = genLoadInfoBySegmentId(str, strArr, layoutEntity, methodContext.tablePlan(), methodContext.df, methodContext.tableFlow());
        genLoadInfoBySegmentId.setTargetDatabase(methodContext.database);
        genLoadInfoBySegmentId.setTargetTable((String) methodContext.prefixTableName.apply(genLoadInfoBySegmentId.getLayout()));
        return genLoadInfoBySegmentId;
    }

    private LoadInfo genLoadInfoBySegmentId(String str, String[] strArr, LayoutEntity layoutEntity, TablePlan tablePlan, NDataflow nDataflow, TableFlow tableFlow) {
        TableEntity tableEntity = (TableEntity) tablePlan.getEntity(layoutEntity).orElse(null);
        Preconditions.checkArgument(tableEntity != null);
        return LoadInfo.distribute(selectInstances(strArr, Math.min(strArr.length, tableEntity.getShardNumbers())), nDataflow.getModel(), nDataflow.getSegment(str), getFileProvider(nDataflow, str, layoutEntity.getId()), layoutEntity, tableFlow, tableEntity);
    }

    protected List<LoadInfo> preprocessLoadInfo(List<LoadInfo> list) {
        return list;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init() {
        this.loadContext = new LoadContext(this);
        loadState();
    }

    protected void preCheck() {
        SecondStorageUtil.isModelEnable(getProject(), getTargetModelId());
    }

    public ExecuteResult doWork(ExecutableContext executableContext) throws ExecuteException {
        if (!SecondStorageUtil.isModelEnable(getProject(), getParam("dataflowId"))) {
            return ExecuteResult.createSkip();
        }
        SegmentRange.TimePartitionedSegmentRange timePartitionedSegmentRange = new SegmentRange.TimePartitionedSegmentRange(Long.valueOf(getDataRangeStart()), Long.valueOf(getDataRangeEnd()));
        SecondStorageLockUtils.acquireLock(getTargetModelId(), timePartitionedSegmentRange).lock();
        try {
            init();
            preCheck();
            MethodContext methodContext = new MethodContext(this);
            ExecuteResult executeResult = (ExecuteResult) wrapWithExecuteException(() -> {
                List listAll = SecondStorage.nodeGroupManager(methodContext.config, methodContext.project).listAll();
                Iterator it = listAll.iterator();
                while (it.hasNext()) {
                    if (LockTypeEnum.locked(Arrays.asList(LockTypeEnum.LOAD.name()), ((NodeGroup) it.next()).getLockTypes())) {
                        logger.info("project={} has been locked, skip the step", methodContext.getProject());
                        return ExecuteResult.createSkip();
                    }
                }
                waitFlatTableStepEnd();
                ?? r0 = new String[listAll.size()];
                logger.info("project={} replica, nodeGroup {}", this.project, (Object) r0);
                ListIterator listIterator = listAll.listIterator();
                while (listIterator.hasNext()) {
                    r0[listIterator.nextIndex()] = (String[]) ((NodeGroup) listIterator.next()).getNodeNames().toArray(new String[0]);
                }
                String[][] replicaShards = new ShardOptions(ShardOptions.buildReplicaSharding((String[][]) r0)).replicaShards();
                int length = replicaShards.length;
                ArrayList arrayList = new ArrayList();
                List list = (List) SecondStorage.tableFlowManager(methodContext.config, methodContext.project).listAll().stream().flatMap(tableFlow -> {
                    return tableFlow.getTableDataList().stream();
                }).flatMap(tableData -> {
                    return tableData.getPartitions().stream();
                }).collect(Collectors.toList());
                HashMap hashMap = new HashMap();
                list.forEach(tablePartition -> {
                    tablePartition.getNodeFileMap().forEach((str, list2) -> {
                        hashMap.put(str, Long.valueOf(((Long) hashMap.computeIfAbsent(str, str -> {
                            return 0L;
                        })).longValue() + ((Long) list2.stream().map((v0) -> {
                            return v0.getLen();
                        }).reduce((v0, v1) -> {
                            return Long.sum(v0, v1);
                        }).orElse(0L)).longValue()));
                    });
                });
                int[] indexInGroup = getIndexInGroup(replicaShards[0], hashMap);
                logger.info("project={} indexInGroup={}", this.project, indexInGroup);
                for (String[] strArr : replicaShards) {
                    arrayList.add(preprocessLoadInfo(distributeLoad(methodContext, orderGroupByIndex(strArr, indexInGroup))));
                }
                this.loadInfos = (List) IntStream.range(0, ((List) arrayList.get(0)).size()).mapToObj(i -> {
                    ArrayList arrayList2 = new ArrayList(length);
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        arrayList2.add(((List) it2.next()).get(i));
                    }
                    return arrayList2;
                }).sorted(Comparator.comparing(list2 -> {
                    return ((LoadInfo) list2.get(0)).getSegmentId();
                })).collect(Collectors.toList());
                loadData(methodContext);
                updateMeta();
                return ExecuteResult.createSucceed();
            });
            SecondStorageLockUtils.unlock(getTargetModelId(), timePartitionedSegmentRange);
            return executeResult;
        } catch (Throwable th) {
            SecondStorageLockUtils.unlock(getTargetModelId(), timePartitionedSegmentRange);
            throw th;
        }
    }

    private void loadData(MethodContext methodContext) throws InterruptedException, ExecutionException, SQLException, JobStoppedException {
        List<DataLoader> list = (List) this.loadInfos.stream().map(list2 -> {
            return new DataLoader(getId(), methodContext.getDatabase(), createTableEngine(), methodContext.isIncremental, list2, this.loadContext);
        }).filter(dataLoader -> {
            return !this.loadContext.getHistorySegments(dataLoader.getSegmentKey()).contains(dataLoader.getSegmentId());
        }).collect(Collectors.toList());
        List<ShardLoader> list3 = (List) list.stream().flatMap(dataLoader2 -> {
            return dataLoader2.getShardLoaders().stream();
        }).collect(Collectors.toList());
        try {
            checkResumeFileCorrect(list, this.loadContext);
            Iterator<ShardLoader> it = list3.iterator();
            while (it.hasNext()) {
                it.next().setup(this.loadContext.isNewJob());
            }
            loadToTemp(list, methodContext.dataflowId);
            waitDFSEnd(getParentId());
            SecondStorageConcurrentTestUtil.wait("WAIT_BEFORE_COMMIT");
            if (!isStop()) {
                beforeDataCommit();
                commitLoad(list, list3, methodContext.dataflowId);
            }
        } finally {
            boolean isPauseOrError = isPauseOrError();
            list3.forEach(shardLoader -> {
                shardLoader.cleanUpQuietly(isPauseOrError);
            });
            jobStopHandle(false);
        }
    }

    public void checkResumeFileCorrect(List<DataLoader> list, LoadContext loadContext) {
        if (loadContext.isNewJob()) {
            return;
        }
        HashSet hashSet = new HashSet();
        list.forEach(dataLoader -> {
            dataLoader.getSingleFileLoaderPerNode().values().forEach(list2 -> {
                list2.forEach(clickhouseLoadFileLoad -> {
                    hashSet.add(clickhouseLoadFileLoad.getParquetFile());
                });
            });
        });
        if (hashSet.isEmpty()) {
            return;
        }
        Iterator<List<String>> it = loadContext.getHistory().values().iterator();
        while (it.hasNext()) {
            if (!hashSet.containsAll(it.next())) {
                throw new IllegalStateException("The file status changed after pausing, please try to resume");
            }
        }
    }

    private void executeTasks(ExecutorService executorService, List<Runnable> list, AtomicBoolean atomicBoolean, CountDownLatch countDownLatch, AtomicInteger atomicInteger, BooleanSupplier booleanSupplier) throws InterruptedException, ExecutionException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Runnable> it = list.iterator();
        while (it.hasNext()) {
            newArrayList.add(executorService.submit(it.next()));
        }
        do {
            if (booleanSupplier.getAsBoolean()) {
                atomicBoolean.set(true);
            }
            logger.info("Tiered storage process is {}", Integer.valueOf(atomicInteger.get()));
        } while (!countDownLatch.await(5L, TimeUnit.SECONDS));
        Iterator it2 = newArrayList.iterator();
        while (it2.hasNext()) {
            ((Future) it2.next()).get();
        }
    }

    public void loadToTemp(List<DataLoader> list, String str) throws ExecutionException, InterruptedException {
        List listProjectNodes = SecondStorageUtil.listProjectNodes(getProject());
        int secondStorageLoadThreadsPerJob = NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(str).getConfig().getSecondStorageLoadThreadsPerJob();
        logger.info("Tiered storage load file start. Load file concurrency is {}", Integer.valueOf(secondStorageLoadThreadsPerJob));
        Map<String, ConcurrentLinkedQueue<ClickhouseLoadActionUnit>> fileLoaders = getFileLoaders(list);
        int size = listProjectNodes.size() * secondStorageLoadThreadsPerJob;
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory("ClickhouseLoadFileWorker"));
        try {
            doLoadDataAction(threadPoolExecutor, fileLoaders, secondStorageLoadThreadsPerJob, this::isStop);
            if (threadPoolExecutor.isShutdown()) {
                return;
            }
            threadPoolExecutor.shutdown();
        } catch (Throwable th) {
            if (!threadPoolExecutor.isShutdown()) {
                threadPoolExecutor.shutdown();
            }
            throw th;
        }
    }

    public void commitLoad(List<DataLoader> list, List<ShardLoader> list2, String str) throws SQLException, ExecutionException, InterruptedException {
        Iterator<ShardLoader> it = list2.iterator();
        while (it.hasNext()) {
            it.next().createDestTableIgnoreExist();
        }
        List listProjectNodes = SecondStorageUtil.listProjectNodes(getProject());
        int secondStorageCommitThreadsPerJob = NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(str).getConfig().getSecondStorageCommitThreadsPerJob();
        logger.info("Tiered storage commit file start. Commit file concurrency is {}", Integer.valueOf(secondStorageCommitThreadsPerJob));
        int size = listProjectNodes.size() * secondStorageCommitThreadsPerJob;
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new NamedThreadFactory("ClickhouseCommitLoadWorker"));
        try {
            try {
                try {
                    Map<String, ConcurrentLinkedQueue<ClickhouseLoadActionUnit>> beforeCommitDropPartitions = getBeforeCommitDropPartitions(list);
                    if (!beforeCommitDropPartitions.isEmpty()) {
                        doLoadDataAction(threadPoolExecutor, beforeCommitDropPartitions, secondStorageCommitThreadsPerJob, () -> {
                            return false;
                        });
                    }
                    doLoadDataAction(threadPoolExecutor, getCommitMovePartitions(list), secondStorageCommitThreadsPerJob, () -> {
                        return false;
                    });
                    list.forEach(dataLoader -> {
                        this.loadContext.finishSegment(dataLoader.getSegmentId(), dataLoader.getSegmentKey());
                    });
                    if (!threadPoolExecutor.isShutdown()) {
                        threadPoolExecutor.shutdown();
                    }
                    SecondStorageConcurrentTestUtil.wait("WAIT_AFTER_COMMIT");
                } catch (Exception e) {
                    doLoadDataAction(threadPoolExecutor, getExceptionCommitDropPartitions(list), secondStorageCommitThreadsPerJob, () -> {
                        return false;
                    });
                    ExceptionUtils.rethrow(e);
                    if (!threadPoolExecutor.isShutdown()) {
                        threadPoolExecutor.shutdown();
                    }
                    SecondStorageConcurrentTestUtil.wait("WAIT_AFTER_COMMIT");
                }
            } catch (InterruptedException e2) {
                doLoadDataAction(threadPoolExecutor, getExceptionCommitDropPartitions(list), secondStorageCommitThreadsPerJob, () -> {
                    return false;
                });
                throw e2;
            }
        } catch (Throwable th) {
            if (!threadPoolExecutor.isShutdown()) {
                threadPoolExecutor.shutdown();
            }
            SecondStorageConcurrentTestUtil.wait("WAIT_AFTER_COMMIT");
            throw th;
        }
    }

    private void doLoadDataAction(ExecutorService executorService, Map<String, ConcurrentLinkedQueue<ClickhouseLoadActionUnit>> map, int i, BooleanSupplier booleanSupplier) throws ExecutionException, InterruptedException {
        if (map.isEmpty()) {
            return;
        }
        CountDownLatch countDownLatch = new CountDownLatch(map.size() * i);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Map.Entry<String, ConcurrentLinkedQueue<ClickhouseLoadActionUnit>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            IntStream.range(0, i).forEach(i2 -> {
                newArrayList.add(() -> {
                    try {
                        try {
                            ClickHouse clickHouse = new ClickHouse(SecondStorageNodeHelper.resolve(key));
                            Throwable th = null;
                            try {
                                try {
                                    ConcurrentLinkedQueue concurrentLinkedQueue = (ConcurrentLinkedQueue) map.get(key);
                                    while (!concurrentLinkedQueue.isEmpty() && !atomicBoolean2.get() && !atomicBoolean.get()) {
                                        ClickhouseLoadActionUnit clickhouseLoadActionUnit = (ClickhouseLoadActionUnit) concurrentLinkedQueue.poll();
                                        if (clickhouseLoadActionUnit != null) {
                                            atomicInteger.incrementAndGet();
                                            clickhouseLoadActionUnit.doAction(clickHouse);
                                            atomicInteger.decrementAndGet();
                                        }
                                    }
                                    if (clickHouse != null) {
                                        if (0 != 0) {
                                            try {
                                                clickHouse.close();
                                            } catch (Throwable th2) {
                                                th.addSuppressed(th2);
                                            }
                                        } else {
                                            clickHouse.close();
                                        }
                                    }
                                    countDownLatch.countDown();
                                } catch (Throwable th3) {
                                    th = th3;
                                    throw th3;
                                }
                            } catch (Throwable th4) {
                                if (clickHouse != null) {
                                    if (th != null) {
                                        try {
                                            clickHouse.close();
                                        } catch (Throwable th5) {
                                            th.addSuppressed(th5);
                                        }
                                    } else {
                                        clickHouse.close();
                                    }
                                }
                                throw th4;
                            }
                        } catch (Exception e) {
                            atomicBoolean2.set(true);
                            atomicInteger.decrementAndGet();
                            ExceptionUtils.rethrow(e);
                            countDownLatch.countDown();
                        }
                    } catch (Throwable th6) {
                        countDownLatch.countDown();
                        throw th6;
                    }
                });
            });
        }
        executeTasks(executorService, newArrayList, atomicBoolean, countDownLatch, atomicInteger, booleanSupplier);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateMeta() {
        Preconditions.checkArgument(getLoadInfos() != null, "no load info found");
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            MethodContext methodContext = new MethodContext(this);
            boolean z = methodContext.isIncremental;
            methodContext.tableFlow().update(tableFlow -> {
                getLoadInfos().stream().flatMap((v0) -> {
                    return v0.stream();
                }).forEach(loadInfo -> {
                    loadInfo.upsertTableData(tableFlow, methodContext.database, (String) methodContext.prefixTableName.apply(loadInfo.getLayout()), z ? PartitionType.INCREMENTAL : PartitionType.FULL);
                });
            });
            updateDFSSegmentIfNeeded(methodContext);
            return null;
        }, this.project, 1, getEpochId());
    }

    protected void updateDFSSegmentIfNeeded(MethodContext methodContext) {
        if (SegmentOnlineMode.ANY.toString().equalsIgnoreCase(getProjectConfig().getKylinEngineSegmentOnlineMode()) && isDAGJobScheduler()) {
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(getConfig(), getProject());
            NDataflow copy = nDataflowManager.getDataflow(methodContext.getDataflowId()).copy();
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(methodContext.getDf().getId());
            Stream stream = methodContext.segmentIds.stream();
            copy.getClass();
            List list = (List) stream.map(copy::getSegment).filter((v0) -> {
                return Objects.nonNull(v0);
            }).filter(nDataSegment -> {
                return nDataSegment.getStatus() == SegmentStatusEnum.NEW;
            }).peek(nDataSegment2 -> {
                nDataSegment2.setStatus(SegmentStatusEnum.READY);
            }).collect(Collectors.toList());
            if (!list.isEmpty()) {
                nDataflowUpdate.setToUpdateSegs((NDataSegment[]) list.toArray(new NDataSegment[0]));
                nDataflowManager.updateDataflowWithoutIndex(nDataflowUpdate);
            }
            markDFStatus(methodContext.dataflowId);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void markDFStatus(String str) {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), getProject());
        NDataflow dataflow = nDataflowManager.getDataflow(str);
        boolean isOfflineModel = nDataflowManager.isOfflineModel(dataflow);
        if (RealizationStatusEnum.OFFLINE != dataflow.getStatus() || isOfflineModel) {
            return;
        }
        nDataflowManager.updateDataflowStatus(dataflow.getId(), RealizationStatusEnum.ONLINE);
    }

    public void saveState(boolean z) {
        Preconditions.checkNotNull(this.loadContext, "load context can't be null");
        HashMap hashMap = new HashMap();
        hashMap.put(LoadContext.CLICKHOUSE_LOAD_CONTEXT, z ? LoadContext.emptyState() : this.loadContext.serializeToString());
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            getManager().updateJobOutput(getParentId(), (ExecutableState) null, hashMap, (Set) null, (String) null);
            return null;
        }, this.project, 3, getEpochId(), getTempLockName());
    }

    public void loadState() {
        Preconditions.checkNotNull(this.loadContext, "load context can't be null");
        String str = (String) getManager().getOutputFromHDFSByJobId(getParentId()).getExtra().get(LoadContext.CLICKHOUSE_LOAD_CONTEXT);
        if (!StringUtils.isEmpty(str)) {
            this.loadContext.deserializeToString(str);
        }
        saveState(true);
    }

    private int[] getIndexInGroup(String[] strArr, Map<String, Long> map) {
        return IntStream.range(0, strArr.length).mapToObj(i -> {
            return new Pair(strArr[i], Integer.valueOf(i));
        }).sorted(Comparator.comparing(pair -> {
            return (Long) map.getOrDefault(pair.getFirst(), 0L);
        })).map((v0) -> {
            return v0.getSecond();
        }).mapToInt(num -> {
            return num.intValue();
        }).toArray();
    }

    private String[] orderGroupByIndex(String[] strArr, int[] iArr) {
        return (String[]) ((List) IntStream.range(0, strArr.length).mapToObj(i -> {
            return strArr[iArr[i]];
        }).collect(Collectors.toList())).toArray(new String[0]);
    }

    private void waitFlatTableStepEnd() throws JobStoppedException, InterruptedException {
        if (isDAGJobScheduler()) {
            long secondStorageWaitIndexBuildSecond = getConfig().getSecondStorageWaitIndexBuildSecond();
            AbstractExecutable job = getManager().getJob(getParentId());
            while (!isStop()) {
                boolean isFlatTableSuccess = isFlatTableSuccess(job);
                if (!isFlatTableSuccess) {
                    logger.info("Tiered storage is waiting flat table end");
                    TimeUnit.SECONDS.sleep(secondStorageWaitIndexBuildSecond);
                }
                if (isFlatTableSuccess) {
                    break;
                }
            }
            jobStopHandle(true);
            logger.info("Tiered storage load beginning");
        }
    }

    protected boolean isFlatTableSuccess(AbstractExecutable abstractExecutable) {
        return SecondStorageUtil.checkBuildFlatTableIsSuccess(abstractExecutable);
    }

    private void waitDFSEnd(String str) throws InterruptedException {
        if (needWaitDFSEnd()) {
            long secondStorageWaitIndexBuildSecond = getConfig().getSecondStorageWaitIndexBuildSecond();
            AbstractExecutable job = getManager().getJob(str);
            while (!isStop()) {
                boolean isDfsSuccess = isDfsSuccess(job);
                if (!isDfsSuccess) {
                    logger.info("Tiered storage is waiting dfs end");
                    TimeUnit.SECONDS.sleep(secondStorageWaitIndexBuildSecond);
                }
                if (isDfsSuccess) {
                    break;
                }
            }
            logger.info("Tiered storage continue after DFS end");
        }
    }

    protected boolean isDfsSuccess(AbstractExecutable abstractExecutable) {
        return SecondStorageUtil.checkBuildDfsIsSuccess(abstractExecutable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isDAGJobScheduler() {
        return JobSchedulerModeEnum.DAG == getParent().getJobSchedulerMode();
    }

    protected boolean needWaitDFSEnd() {
        return isDAGJobScheduler() && !SegmentOnlineMode.ANY.toString().equalsIgnoreCase(getProjectConfig().getKylinEngineSegmentOnlineMode());
    }

    protected void beforeDataCommit() {
    }

    protected void jobStopHandle(boolean z) throws JobStoppedException {
        if (isStop()) {
            if (isPauseOrError()) {
                saveState(z);
            }
            if (isPaused() || isDiscarded()) {
                throw new JobStoppedException("job stop manually.");
            }
            if (isStop()) {
                throw new JobStoppedException("job stop by main task.");
            }
        }
    }

    public boolean isDiscarded() {
        return ExecutableState.DISCARDED == SecondStorageUtil.getJobStatus(this.project, getParentId());
    }

    public boolean isPaused() {
        return ExecutableState.PAUSED == SecondStorageUtil.getJobStatus(this.project, getParentId());
    }

    public boolean isError() {
        return ExecutableState.ERROR == SecondStorageUtil.getJobStatus(this.project, getParentId());
    }

    public boolean isSuicidal() {
        return ExecutableState.SUICIDAL == SecondStorageUtil.getJobStatus(this.project, getParentId());
    }

    public boolean isStop() {
        return isDiscarded() || isPaused() || isError() || isSuicidal();
    }

    public boolean isPauseOrError() {
        return isPaused() || isError();
    }

    public KylinConfig getProjectConfig() {
        return NProjectManager.getInstance(getConfig()).getProject(getProject()).getConfig();
    }

    private Map<String, ConcurrentLinkedQueue<ClickhouseLoadActionUnit>> getFileLoaders(List<DataLoader> list) {
        HashMap hashMap = new HashMap(list.size());
        Iterator<DataLoader> it = list.iterator();
        while (it.hasNext()) {
            it.next().getSingleFileLoaderPerNode().forEach((str, list2) -> {
                ((ConcurrentLinkedQueue) hashMap.computeIfAbsent(str, str -> {
                    return new ConcurrentLinkedQueue();
                })).addAll(list2);
            });
        }
        return hashMap;
    }

    private Map<String, ConcurrentLinkedQueue<ClickhouseLoadActionUnit>> getBeforeCommitDropPartitions(List<DataLoader> list) {
        HashMap hashMap = new HashMap(list.size());
        Iterator<DataLoader> it = list.iterator();
        while (it.hasNext()) {
            it.next().getLoadCommitDropPartitions().forEach((str, list2) -> {
                ((ConcurrentLinkedQueue) hashMap.computeIfAbsent(str, str -> {
                    return new ConcurrentLinkedQueue();
                })).addAll(list2);
            });
        }
        return hashMap;
    }

    private Map<String, ConcurrentLinkedQueue<ClickhouseLoadActionUnit>> getCommitMovePartitions(List<DataLoader> list) throws SQLException {
        HashMap hashMap = new HashMap(list.size());
        Iterator<DataLoader> it = list.iterator();
        while (it.hasNext()) {
            it.next().getLoadCommitMovePartitions().forEach((str, list2) -> {
                ((ConcurrentLinkedQueue) hashMap.computeIfAbsent(str, str -> {
                    return new ConcurrentLinkedQueue();
                })).addAll(list2);
            });
        }
        return hashMap;
    }

    private Map<String, ConcurrentLinkedQueue<ClickhouseLoadActionUnit>> getExceptionCommitDropPartitions(List<DataLoader> list) throws SQLException {
        HashMap hashMap = new HashMap(list.size());
        Iterator<DataLoader> it = list.iterator();
        while (it.hasNext()) {
            it.next().getLoadCommitExceptionPartitions().forEach((str, list2) -> {
                ((ConcurrentLinkedQueue) hashMap.computeIfAbsent(str, str -> {
                    return new ConcurrentLinkedQueue();
                })).addAll(list2);
            });
        }
        return hashMap;
    }
}
