package org.apache.kylin.engine.spark.job;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/kylin-spark-engine-4.0.3.jar:org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.class */
public class BuildLayoutWithUpdate {
    protected static final Logger logger = LoggerFactory.getLogger((Class<?>) BuildLayoutWithUpdate.class);
    private Semaphore semaphore;
    private StorageLevel storageLevel;
    private boolean persistParentDataset;
    private ExecutorService pool = Executors.newCachedThreadPool();
    private CompletionService<JobResult> completionService = new ExecutorCompletionService(this.pool);
    private int currentLayoutsNum = 0;
    private Map<Long, AtomicLong> toBuildCuboidSize = new ConcurrentHashMap();
    private Map<Long, Dataset<Row>> layout2DataSet = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/kylin-spark-engine-4.0.3.jar:org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate$JobEntity.class */
    public static abstract class JobEntity {
        public abstract String getName();

        public abstract LayoutEntity build() throws IOException;

        public abstract NBuildSourceInfo getBuildSourceInfo();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kylin-spark-engine-4.0.3.jar:org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate$JobResult.class */
    public static class JobResult {
        private LayoutEntity layout;
        private Throwable throwable;

        JobResult(LayoutEntity layoutEntity, Throwable th) {
            this.layout = layoutEntity;
            this.throwable = th;
        }

        boolean isFailed() {
            return this.throwable != null;
        }

        Throwable getThrowable() {
            return this.throwable;
        }

        LayoutEntity getLayout() {
            return this.layout;
        }
    }

    public BuildLayoutWithUpdate(KylinConfig kylinConfig) {
        this.storageLevel = StorageLevel.fromString(kylinConfig.getParentDatasetStorageLevel());
        this.persistParentDataset = !this.storageLevel.equals(StorageLevel.NONE());
        if (this.persistParentDataset) {
            if (kylinConfig.getMaxParentDatasetPersistCount() < 1) {
                throw new IllegalArgumentException("max parent dataset persist count should be larger than 1");
            }
            this.semaphore = new Semaphore(kylinConfig.getMaxParentDatasetPersistCount());
        }
    }

    public void cacheAndRegister(long j, Dataset<Row> dataset) throws InterruptedException {
        if (this.persistParentDataset) {
            logger.info("persist dataset of layout: {}", Long.valueOf(j));
            this.semaphore.acquire();
            this.layout2DataSet.put(Long.valueOf(j), dataset);
            dataset.persist(this.storageLevel);
        }
    }

    public void submit(final JobEntity jobEntity, final KylinConfig kylinConfig) {
        if (this.persistParentDataset && jobEntity.getBuildSourceInfo() != null) {
            if (!this.layout2DataSet.containsKey(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()))) {
                logger.error("persist parent dataset is enabled, but parent dataset not registered");
                throw new RuntimeException("parent dataset not registered");
            }
            if (!this.toBuildCuboidSize.containsKey(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()))) {
                this.toBuildCuboidSize.put(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()), new AtomicLong(jobEntity.getBuildSourceInfo().getToBuildCuboids().size()));
            }
        }
        this.completionService.submit(new Callable<JobResult>() { // from class: org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public JobResult call() throws Exception {
                KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
                Thread.currentThread().setName("thread-" + jobEntity.getName());
                LayoutEntity layoutEntity = null;
                Throwable th = null;
                try {
                    try {
                        layoutEntity = jobEntity.build();
                        if (BuildLayoutWithUpdate.this.persistParentDataset && jobEntity.getBuildSourceInfo() != null && ((AtomicLong) BuildLayoutWithUpdate.this.toBuildCuboidSize.get(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()))).decrementAndGet() == 0) {
                            BuildLayoutWithUpdate.this.toBuildCuboidSize.remove(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()));
                            ((Dataset) BuildLayoutWithUpdate.this.layout2DataSet.get(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()))).unpersist();
                            BuildLayoutWithUpdate.logger.info("dataset of layout: {} released", Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()));
                            BuildLayoutWithUpdate.this.semaphore.release();
                        }
                    } catch (Throwable th2) {
                        BuildLayoutWithUpdate.logger.error("Error occurred when run " + jobEntity.getName(), th2);
                        th = th2;
                        if (BuildLayoutWithUpdate.this.persistParentDataset && jobEntity.getBuildSourceInfo() != null && ((AtomicLong) BuildLayoutWithUpdate.this.toBuildCuboidSize.get(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()))).decrementAndGet() == 0) {
                            BuildLayoutWithUpdate.this.toBuildCuboidSize.remove(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()));
                            ((Dataset) BuildLayoutWithUpdate.this.layout2DataSet.get(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()))).unpersist();
                            BuildLayoutWithUpdate.logger.info("dataset of layout: {} released", Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()));
                            BuildLayoutWithUpdate.this.semaphore.release();
                        }
                    }
                    return new JobResult(layoutEntity, th);
                } catch (Throwable th3) {
                    if (BuildLayoutWithUpdate.this.persistParentDataset && jobEntity.getBuildSourceInfo() != null && ((AtomicLong) BuildLayoutWithUpdate.this.toBuildCuboidSize.get(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()))).decrementAndGet() == 0) {
                        BuildLayoutWithUpdate.this.toBuildCuboidSize.remove(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()));
                        ((Dataset) BuildLayoutWithUpdate.this.layout2DataSet.get(Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()))).unpersist();
                        BuildLayoutWithUpdate.logger.info("dataset of layout: {} released", Long.valueOf(jobEntity.getBuildSourceInfo().getLayoutId()));
                        BuildLayoutWithUpdate.this.semaphore.release();
                    }
                    throw th3;
                }
            }
        });
        this.currentLayoutsNum++;
    }

    public void updateLayout(SegmentInfo segmentInfo, KylinConfig kylinConfig) {
        for (int i = 0; i < this.currentLayoutsNum; i++) {
            try {
                logger.info("Wait to take job result.");
                JobResult jobResult = this.completionService.take().get();
                logger.info("Take job result successful.");
                if (jobResult.isFailed()) {
                    shutDownPool();
                    throw new RuntimeException(jobResult.getThrowable());
                }
                segmentInfo.updateLayout(jobResult.layout);
            } catch (InterruptedException | ExecutionException e) {
                shutDownPool();
                throw new RuntimeException(e);
            }
        }
        this.currentLayoutsNum = 0;
    }

    private void shutDownPool() {
        this.pool.shutdown();
        try {
            this.pool.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.warn("Error occurred when shutdown thread pool.", (Throwable) e);
            this.pool.shutdownNow();
        }
    }
}
