package org.apache.kylin.streaming.app;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.streaming.common.MergeJobEntry;
import org.apache.kylin.streaming.constants.StreamingConstants;
import org.apache.kylin.streaming.jobs.StreamingDFMergeJob;
import org.apache.kylin.streaming.jobs.SyncMerger;
import org.apache.kylin.streaming.merge.CatchupMergePolicy;
import org.apache.kylin.streaming.merge.MergePolicy;
import org.apache.kylin.streaming.merge.NormalMergePolicy;
import org.apache.kylin.streaming.merge.PeakMergePolicy;
import org.apache.kylin.streaming.request.StreamingSegmentRequest;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.JobExecutionIdHolder;
import org.apache.kylin.streaming.util.JobKiller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/streaming/app/StreamingMergeEntry.class */
public class StreamingMergeEntry extends StreamingMergeApplication {
    private static final Logger logger = LoggerFactory.getLogger(StreamingMergeEntry.class);
    private static final AtomicLong globalMergeTime = new AtomicLong(0);
    private static AtomicBoolean gracefulStop = new AtomicBoolean(false);
    private static CountDownLatch latch = new CountDownLatch(1);
    private StreamingDFMergeJob merger = new StreamingDFMergeJob();
    private CatchupMergePolicy catchupMergePolicy = new CatchupMergePolicy();
    private NormalMergePolicy normalMergePolicy = new NormalMergePolicy();
    private PeakMergePolicy peakMergePolicy = new PeakMergePolicy();
    private AtomicLong hdfsFileScanStartTime = new AtomicLong(System.currentTimeMillis());

    public static void main(String[] strArr) {
        new StreamingMergeEntry().execute(strArr);
    }

    public static void stop() {
        gracefulStop.set(true);
    }

    @Override // org.apache.kylin.streaming.app.StreamingApplication
    public void doExecute() throws ExecuteException {
        setStopFlag(false);
        logger.info("StreamingMergeEntry:{},{},{},{},{}", new Object[]{this.project, this.dataflowId, Long.valueOf(this.thresholdOfSegSize), this.numberOfSeg, this.distMetaUrl});
        while (isRunning()) {
            try {
                try {
                    process(this.project, this.dataflowId);
                    if (isGracefulShutdown(this.project, this.jobId)) {
                        setStopFlag(true);
                        logger.info("begin to shutdown streaming merge job ({}:{})", this.project, this.dataflowId);
                    } else {
                        StreamingUtils.sleep(this.kylinConfig.getStreamingSegmentMergeInterval() * 1000);
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    JobKiller.killApplication(this.jobId);
                    throw new ExecuteException("streaming merging segment error occured: ", e);
                }
            } catch (Throwable th) {
                close(false);
                throw th;
            }
        }
        closeSparkSession();
        close(false);
    }

    @Override // org.apache.kylin.streaming.jobs.GracefulStopInterface
    public boolean getStopFlag() {
        return gracefulStop.get();
    }

    @Override // org.apache.kylin.streaming.jobs.GracefulStopInterface
    public void setStopFlag(boolean z) {
        gracefulStop.set(z);
    }

    public void process(String str, String str2) {
        StreamingUtils.replayAuditlog();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
        Segments<NDataSegment> segments = nDataflowManager.getDataflow(str2).getSegments().getSegments(SegmentStatusEnum.READY, SegmentStatusEnum.WARNING);
        Collections.sort(segments);
        removeLastL0Segment(segments);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        MergePolicy selectPolicy = selectPolicy(segments, atomicInteger.get());
        while (selectPolicy != null && selectPolicy.matchMergeCondition(this.thresholdOfSegSize)) {
            List<NDataSegment> matchSegList = selectPolicy.getMatchSegList();
            NDataSegment mergeSegments = mergeSegments(str, str2, matchSegList, atomicInteger.get());
            selectPolicy.next(atomicInteger);
            NDataflow dataflow = nDataflowManager.getDataflow(str2);
            Segments<NDataSegment> segments2 = dataflow.getSegments(SegmentStatusEnum.READY, SegmentStatusEnum.WARNING, SegmentStatusEnum.NEW);
            removeLastL0Segment(segments2);
            NDataSegment segment = getSegment(segments2, mergeSegments, str, str2);
            if (segment.getStatus() == SegmentStatusEnum.NEW) {
                return;
            }
            if (segment.getStorageBytesSize() > this.thresholdOfSegSize) {
                logger.info("SegmentId={} size ({}) exceeds threshold", segment.getId(), Long.valueOf(segment.getStorageBytesSize()));
                return;
            }
            for (NDataSegment nDataSegment : matchSegList) {
                putHdfsFile(nDataSegment.getId(), new Pair<>(dataflow.getSegmentHdfsPath(nDataSegment.getId()), Long.valueOf(System.currentTimeMillis())));
            }
            selectPolicy = selectPolicy(segments2, atomicInteger.get());
            clearHdfsFiles(nDataflowManager.getDataflow(str2), this.hdfsFileScanStartTime);
        }
    }

    public NDataSegment mergeSegments(String str, String str2, List<NDataSegment> list, int i) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        int i2 = 0;
        while (true) {
            int i3 = i2;
            i2++;
            if (i3 >= 3) {
                throw new KylinException(ServerErrorCode.SEGMENT_MERGE_FAILURE, str + "/" + str2);
            }
            try {
                return allocateSegment(str, str2, list, i);
            } catch (KylinException e) {
                logger.error(e.getMessage(), e);
                StreamingUtils.sleep(instanceFromEnv.getStreamingSegmentMergeInterval() * 1000 * i2);
            }
        }
    }

    public NDataSegment getSegment(Segments<NDataSegment> segments, NDataSegment nDataSegment, String str, String str2) {
        NDataSegment segment = segments.getSegment(nDataSegment.getName(), SegmentStatusEnum.READY);
        if (segment == null) {
            segment = segments.getSegment(nDataSegment.getName(), SegmentStatusEnum.WARNING);
        }
        if (segment == null) {
            segment = segments.getSegment(nDataSegment.getName(), SegmentStatusEnum.NEW);
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            if (segment != null && !instanceFromEnv.isUTEnv()) {
                removeSegment(str, str2, segment);
            }
        }
        if (segment == null) {
            throw new KylinException(ServerErrorCode.SEGMENT_MERGE_FAILURE, "segment is null");
        }
        return segment;
    }

    public void removeSegment(String str, String str2, NDataSegment nDataSegment) {
        StreamingSegmentRequest streamingSegmentRequest = new StreamingSegmentRequest(str, str2);
        streamingSegmentRequest.setRemoveSegment(Arrays.asList(nDataSegment));
        streamingSegmentRequest.setJobType(this.jobType.name());
        streamingSegmentRequest.setJobExecutionId(JobExecutionIdHolder.getJobExecutionId(this.jobId).intValue());
        RestSupport createRestSupport = createRestSupport(KylinConfig.getInstanceFromEnv());
        Throwable th = null;
        try {
            try {
                createRestSupport.execute(createRestSupport.createHttpPost("/streaming_jobs/dataflow/segment/deletion"), streamingSegmentRequest);
                if (createRestSupport != null) {
                    if (0 != 0) {
                        try {
                            createRestSupport.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createRestSupport.close();
                    }
                }
                StreamingUtils.replayAuditlog();
            } finally {
            }
        } catch (Throwable th3) {
            if (createRestSupport != null) {
                if (th != null) {
                    try {
                        createRestSupport.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createRestSupport.close();
                }
            }
            throw th3;
        }
    }

    public NDataSegment allocateSegment(String str, String str2, List<NDataSegment> list, int i) {
        HashMap newHashMap = Maps.newHashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        AtomicLong atomicLong = new AtomicLong(Long.MAX_VALUE);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        list.forEach(nDataSegment -> {
            SegmentRange.KafkaOffsetPartitionedSegmentRange kSRange = nDataSegment.getKSRange();
            if (kSRange.getStart() != null && kSRange.getStart().longValue() < atomicLong.get()) {
                atomicLong.set(kSRange.getStart().longValue());
            }
            if (kSRange.getEnd() != null && kSRange.getEnd().longValue() > atomicLong2.get()) {
                atomicLong2.set(kSRange.getEnd().longValue());
            }
            kSRange.getSourcePartitionOffsetStart().forEach((num, l) -> {
                if (!newHashMap.containsKey(num) || ((Long) newHashMap.get(num)).longValue() > l.longValue()) {
                    newHashMap.put(num, l);
                }
            });
            kSRange.getSourcePartitionOffsetEnd().forEach((num2, l2) -> {
                if (!newHashMap2.containsKey(num2) || ((Long) newHashMap2.get(num2)).longValue() < l2.longValue()) {
                    newHashMap2.put(num2, l2);
                }
            });
        });
        SegmentRange.KafkaOffsetPartitionedSegmentRange kafkaOffsetPartitionedSegmentRange = new SegmentRange.KafkaOffsetPartitionedSegmentRange(Long.valueOf(atomicLong.get()), Long.valueOf(atomicLong2.get()), newHashMap, newHashMap2);
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        NDataSegment doMergeStreamingSegment = instanceFromEnv.isUTEnv() ? (NDataSegment) EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
            return nDataflowManager.mergeSegments(nDataflowManager.getDataflow(str2), kafkaOffsetPartitionedSegmentRange, true, Integer.valueOf(i + 1), null);
        }, str) : doMergeStreamingSegment(str, str2, kafkaOffsetPartitionedSegmentRange, i);
        logger.info("start sync thread for merge");
        NDataflow dataflow = NDataflowManager.getInstance(instanceFromEnv, str).getDataflow(str2);
        List list2 = (List) list.stream().map(nDataSegment2 -> {
            return dataflow.getSegment(nDataSegment2.getId());
        }).collect(Collectors.toList());
        long sum = list.stream().mapToLong((v0) -> {
            return v0.getSourceCount();
        }).sum();
        logger.info("afterMergeSegment[{}] layer={}  from {}", new Object[]{doMergeStreamingSegment, Integer.valueOf(i), list2});
        new SyncMerger(new MergeJobEntry(this.ss, str, str2, sum, globalMergeTime, list2, doMergeStreamingSegment)).run(this.merger);
        return doMergeStreamingSegment;
    }

    public NDataSegment doMergeStreamingSegment(String str, String str2, SegmentRange.KafkaOffsetPartitionedSegmentRange kafkaOffsetPartitionedSegmentRange, int i) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        StreamingSegmentRequest streamingSegmentRequest = new StreamingSegmentRequest(str, str2);
        streamingSegmentRequest.setSegmentRange(kafkaOffsetPartitionedSegmentRange);
        streamingSegmentRequest.setLayer(String.valueOf(i));
        streamingSegmentRequest.setNewSegId(RandomUtil.randomUUIDStr());
        streamingSegmentRequest.setJobType(this.jobType.name());
        streamingSegmentRequest.setJobExecutionId(JobExecutionIdHolder.getJobExecutionId(this.jobId).intValue());
        RestSupport createRestSupport = createRestSupport(instanceFromEnv);
        Throwable th = null;
        try {
            try {
                String data = createRestSupport.execute(createRestSupport.createHttpPost("/streaming_jobs/dataflow/segment"), streamingSegmentRequest).getData();
                StreamingUtils.replayAuditlog();
                NDataSegment segment = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str).getDataflow(str2).getSegment(data);
                if (createRestSupport != null) {
                    if (0 != 0) {
                        try {
                            createRestSupport.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createRestSupport.close();
                    }
                }
                return segment;
            } finally {
            }
        } catch (Throwable th3) {
            if (createRestSupport != null) {
                if (th != null) {
                    try {
                        createRestSupport.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createRestSupport.close();
                }
            }
            throw th3;
        }
    }

    private MergePolicy selectPolicy(Segments<NDataSegment> segments, int i) {
        Collections.sort(segments);
        if (!this.peakMergePolicy.selectMatchedSegList(segments, i, this.thresholdOfSegSize, this.numberOfSeg.intValue()).isEmpty()) {
            return this.peakMergePolicy;
        }
        if (!this.catchupMergePolicy.selectMatchedSegList(segments, i, this.thresholdOfSegSize, this.numberOfSeg.intValue()).isEmpty()) {
            return this.catchupMergePolicy;
        }
        if (this.normalMergePolicy.selectMatchedSegList(segments, i, this.thresholdOfSegSize, this.numberOfSeg.intValue()).isEmpty()) {
            return null;
        }
        return this.normalMergePolicy;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void removeLastL0Segment(Segments<NDataSegment> segments) {
        Map<String, String> additionalInfo;
        if (segments.isEmpty() || (additionalInfo = ((NDataSegment) segments.get(segments.size() - 1)).getAdditionalInfo()) == null || additionalInfo.containsKey(StreamingConstants.FILE_LAYER)) {
            return;
        }
        segments.remove(segments.size() - 1);
    }

    private void close(boolean z) {
        this.merger.shutdown();
        latch.countDown();
        closeAuditLogStore(this.ss);
        if (z) {
            return;
        }
        systemExit(0);
    }
}
