package org.apache.kylin.engine.spark.job;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.class */
public class BuildLayoutWithUpdate {
    protected static final Logger logger = LoggerFactory.getLogger(BuildLayoutWithUpdate.class);
    private static final int CHECK_POINT_DELAY_SEC = 5;
    private ExecutorService pool = Executors.newCachedThreadPool();
    private CompletionService<JobResult> completionService = new ExecutorCompletionService(this.pool);
    private int currentLayoutsNum = 0;
    private final LinkedBlockingQueue<LayoutPoint> layoutPoints = new LinkedBlockingQueue<>();
    private final ScheduledExecutorService checkPointer = Executors.newSingleThreadScheduledExecutor();

    /* loaded from: input_file:org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate$JobEntity.class */
    public static abstract class JobEntity {
        public abstract long getIndexId();

        public abstract String getName();

        public abstract List<NDataLayout> build() throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate$JobResult.class */
    public static class JobResult {
        private long indexId;
        private List<NDataLayout> layouts;
        private Throwable throwable;

        JobResult(long j, List<NDataLayout> list, Throwable th) {
            this.indexId = j;
            this.layouts = list;
            this.throwable = th;
        }

        boolean isFailed() {
            return this.throwable != null;
        }

        Throwable getThrowable() {
            return this.throwable;
        }

        long getIndexId() {
            return this.indexId;
        }

        List<NDataLayout> getLayouts() {
            return this.layouts;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate$LayoutPoint.class */
    public static class LayoutPoint {
        private final String project;
        private final KylinConfig config;
        private final String dataFlowId;
        private final NDataLayout layout;

        public LayoutPoint(String str, KylinConfig kylinConfig, String str2, NDataLayout nDataLayout) {
            this.project = str;
            this.config = kylinConfig;
            this.dataFlowId = str2;
            this.layout = nDataLayout;
        }

        public String getProject() {
            return this.project;
        }

        public KylinConfig getConfig() {
            return this.config;
        }

        public String getDataFlowId() {
            return this.dataFlowId;
        }

        public NDataLayout getLayout() {
            return this.layout;
        }
    }

    public BuildLayoutWithUpdate() {
        startCheckPoint();
    }

    public void submit(final JobEntity jobEntity, final KylinConfig kylinConfig) {
        this.completionService.submit(new Callable<JobResult>() { // from class: org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.1
            /* JADX WARN: Can't rename method to resolve collision */
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.util.concurrent.Callable
            public JobResult call() throws Exception {
                KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
                Thread.currentThread().setName("thread-" + jobEntity.getName());
                List linkedList = new LinkedList();
                Throwable th = null;
                try {
                    linkedList = jobEntity.build();
                } catch (Throwable th2) {
                    BuildLayoutWithUpdate.logger.error("Error occurred when run " + jobEntity.getName(), th2);
                    th = th2;
                }
                return new JobResult(jobEntity.getIndexId(), linkedList, th);
            }
        });
        this.currentLayoutsNum++;
    }

    public void updateLayout(NDataSegment nDataSegment, KylinConfig kylinConfig, String str) {
        for (int i = 0; i < this.currentLayoutsNum; i++) {
            updateSingleLayout(nDataSegment, kylinConfig, str);
        }
        doCheckPoint();
        this.currentLayoutsNum = 0;
    }

    public long updateSingleLayout(NDataSegment nDataSegment, KylinConfig kylinConfig, String str) {
        try {
            logger.info("Wait to take job result.");
            JobResult jobResult = this.completionService.take().get();
            logger.info("Take job result successful.");
            if (jobResult.isFailed()) {
                shutDown();
                throw new RuntimeException(jobResult.getThrowable());
            }
            long indexId = jobResult.getIndexId();
            for (NDataLayout nDataLayout : jobResult.getLayouts()) {
                logger.info("Update layout {} in dataflow {}, segment {}", new Object[]{Long.valueOf(nDataLayout.getLayoutId()), nDataSegment.getDataflow().getUuid(), nDataSegment.getId()});
                if (!this.layoutPoints.offer(new LayoutPoint(str, kylinConfig, nDataSegment.getDataflow().getId(), nDataLayout))) {
                    throw new IllegalStateException("[UNLIKELY_THINGS_HAPPENED] Make sure that layoutPoints can offer.");
                }
            }
            doCheckPoint();
            return indexId;
        } catch (InterruptedException | ExecutionException e) {
            shutDown();
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void startCheckPoint() {
        this.checkPointer.scheduleWithFixedDelay(this::doCheckPoint, 5L, 5L, TimeUnit.SECONDS);
    }

    private synchronized void doCheckPoint() {
        LayoutPoint peek;
        LayoutPoint poll = this.layoutPoints.poll();
        if (Objects.isNull(poll)) {
            return;
        }
        if (Objects.isNull(poll.getLayout())) {
            logger.warn("[LESS_LIKELY_THINGS_HAPPENED] layout shouldn't be empty.");
            return;
        }
        String project = poll.getProject();
        KylinConfig config = poll.getConfig();
        String dataFlowId = poll.getDataFlowId();
        ArrayList arrayList = new ArrayList();
        arrayList.add(poll.getLayout());
        StringBuilder append = new StringBuilder("Checkpoint layouts: ").append(poll.getLayout().getLayoutId());
        do {
            peek = this.layoutPoints.peek();
            if (!Objects.nonNull(peek) || !Objects.equals(project, peek.getProject()) || !Objects.equals(dataFlowId, peek.getDataFlowId()) || !Objects.nonNull(peek.getLayout())) {
                logger.info(append.toString());
                KylinConfig.setAndUnsetThreadLocalConfig(config);
                updateLayouts(config, project, dataFlowId, arrayList);
                if (Objects.nonNull(this.layoutPoints.peek())) {
                    doCheckPoint();
                    return;
                }
                return;
            }
            arrayList.add(peek.getLayout());
            append.append(',').append(peek.getLayout().getLayoutId());
        } while (Objects.equals(peek, this.layoutPoints.remove()));
        throw new IllegalStateException("[UNLIKELY_THINGS_HAPPENED] Make sure that only one thread can execute layoutPoints poll.");
    }

    protected void updateLayouts(KylinConfig kylinConfig, String str, String str2, List<NDataLayout> list) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(str2);
            nDataflowUpdate.setToAddOrUpdateLayouts((NDataLayout[]) list.toArray(new NDataLayout[0]));
            NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str).updateDataflow(nDataflowUpdate);
            return 0;
        }, str);
    }

    public void shutDown() {
        this.pool.shutdown();
        try {
            this.pool.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.warn("Error occurred when shutdown thread pool.", e);
            ExecutorServiceUtil.forceShutdown(this.pool);
            Thread.currentThread().interrupt();
        }
        this.checkPointer.shutdown();
        try {
            this.checkPointer.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            logger.warn("Error occurred when shutdown checkPointer.", e2);
            ExecutorServiceUtil.forceShutdown(this.checkPointer);
            Thread.currentThread().interrupt();
        }
    }
}
