package org.apache.kylin.stream.core.storage.columnar;

import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.job.shaded.com.codahale.metrics.Gauge;
import org.apache.kylin.job.shaded.com.codahale.metrics.MetricRegistry;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.shaded.com.google.common.base.Charsets;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.shaded.com.google.common.io.Files;
import org.apache.kylin.stream.core.exception.IllegalStorageException;
import org.apache.kylin.stream.core.metrics.StreamingMetrics;
import org.apache.kylin.stream.core.model.StreamingMessage;
import org.apache.kylin.stream.core.model.stats.SegmentStoreStats;
import org.apache.kylin.stream.core.query.ResultCollector;
import org.apache.kylin.stream.core.query.StreamingSearchContext;
import org.apache.kylin.stream.core.storage.IStreamingSegmentStore;
import org.apache.kylin.stream.core.storage.StreamingCubeSegment;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.class */
public class ColumnarSegmentStore implements IStreamingSegmentStore {
    private static final String STATE_FILE = "_STATE";
    private static Logger logger = LoggerFactory.getLogger((Class<?>) ColumnarSegmentStore.class);
    private static ExecutorService fragmentMergeExecutor;
    private volatile SegmentMemoryStore activeMemoryStore;
    private volatile SegmentMemoryStore persistingMemoryStore;
    private ReentrantReadWriteLock persistLock;
    private ReentrantReadWriteLock.ReadLock persistReadLock;
    private ReentrantReadWriteLock.WriteLock persistWriteLock;
    private ReentrantReadWriteLock mergeLock;
    private ReentrantReadWriteLock.ReadLock mergeReadLock;
    private ReentrantReadWriteLock.WriteLock mergeWriteLock;
    private volatile boolean persisting;
    private volatile boolean inMerging;
    private ColumnarMemoryStorePersister memoryStorePersister;
    private String baseStorePath;
    private File dataSegmentFolder;
    private int maxRowsInMemory;
    private ParsedStreamingCubeInfo parsedStreamingCubeInfo;
    private String cubeName;
    private String segmentName;
    private boolean autoMergeEnabled;
    private List<DataSegmentFragment> fragments;
    protected int latestCheckpointFragment;
    private Map<TblColRef, Dictionary<String>> dictionaryMap;

    public ColumnarSegmentStore(String str, CubeInstance cubeInstance, String str2) {
        fragmentMergeExecutor = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("fragments-merge"));
        this.persistLock = new ReentrantReadWriteLock();
        this.persistReadLock = this.persistLock.readLock();
        this.persistWriteLock = this.persistLock.writeLock();
        this.mergeLock = new ReentrantReadWriteLock();
        this.mergeReadLock = this.mergeLock.readLock();
        this.mergeWriteLock = this.mergeLock.writeLock();
        this.persisting = false;
        this.inMerging = false;
        this.fragments = Lists.newCopyOnWriteArrayList();
        this.latestCheckpointFragment = 0;
        this.maxRowsInMemory = cubeInstance.getConfig().getStreamingIndexMaxRows();
        this.baseStorePath = str;
        this.parsedStreamingCubeInfo = new ParsedStreamingCubeInfo(cubeInstance);
        this.cubeName = cubeInstance.getName();
        this.segmentName = str2;
        this.dataSegmentFolder = new File(str + File.separator + this.cubeName + File.separator + str2);
        if (!this.dataSegmentFolder.exists()) {
            this.dataSegmentFolder.mkdirs();
        }
        this.activeMemoryStore = new SegmentMemoryStore(this.parsedStreamingCubeInfo, str2);
        this.memoryStorePersister = new ColumnarMemoryStorePersister(this.parsedStreamingCubeInfo, str2);
        this.autoMergeEnabled = cubeInstance.getConfig().isStreamingFragmentsAutoMergeEnabled();
        try {
            StreamingMetrics.getInstance().getMetricRegistry().register(MetricRegistry.name("streaming.inMem.row.cnt", cubeInstance.getName(), str2), new Gauge<Integer>() { // from class: org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kylin.job.shaded.com.codahale.metrics.Gauge
                public Integer getValue() {
                    return Integer.valueOf(ColumnarSegmentStore.this.activeMemoryStore.getRowCount());
                }
            });
        } catch (Exception e) {
            logger.warn("metrics register failed", (Throwable) e);
        }
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public void init() {
        this.fragments.addAll(getFragmentsFromFileSystem());
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public int addEvent(StreamingMessage streamingMessage) {
        if (this.activeMemoryStore == null) {
            throw new IllegalStateException("the segment has not opened:" + this.segmentName);
        }
        int index = this.activeMemoryStore.index(streamingMessage);
        if (index >= this.maxRowsInMemory) {
            persist();
        }
        return index;
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public void addExternalDict(Map<TblColRef, Dictionary<String>> map) {
        this.dictionaryMap = map;
        this.activeMemoryStore.setDictionaryMap(map);
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public File getStorePath() {
        return this.dataSegmentFolder;
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public Object checkpoint() {
        persist();
        this.latestCheckpointFragment = getLargestFragmentID();
        return String.valueOf(this.latestCheckpointFragment);
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public void persist() {
        if (this.activeMemoryStore.getRowCount() <= 0) {
            logger.info("no data in the memory store, skip persist.");
            return;
        }
        this.persistWriteLock.lock();
        try {
            this.persisting = true;
            DataSegmentFragment createNewFragment = createNewFragment();
            this.persistingMemoryStore = this.activeMemoryStore;
            this.activeMemoryStore = new SegmentMemoryStore(this.parsedStreamingCubeInfo, this.segmentName);
            this.activeMemoryStore.setDictionaryMap(this.dictionaryMap);
            this.persistWriteLock.unlock();
            this.memoryStorePersister.persist(this.persistingMemoryStore, createNewFragment);
            this.persistWriteLock.lock();
            try {
                this.persistingMemoryStore = null;
                this.fragments.add(createNewFragment);
                this.persisting = false;
                checkRequireMerge();
            } finally {
            }
        } finally {
        }
    }

    private void checkRequireMerge() {
        if (!this.autoMergeEnabled || this.inMerging) {
            return;
        }
        KylinConfig config = this.parsedStreamingCubeInfo.cubeDesc.getConfig();
        if (this.fragments.size() <= config.getStreamingMaxFragmentsInSegment()) {
            return;
        }
        final List<DataSegmentFragment> chooseFragmentsToMerge = chooseFragmentsToMerge(config, Lists.newArrayList(this.fragments));
        if (chooseFragmentsToMerge.size() <= 1) {
            return;
        }
        logger.info("found some fragments need to merge:{}", chooseFragmentsToMerge);
        this.inMerging = true;
        fragmentMergeExecutor.submit(new Runnable() { // from class: org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ColumnarSegmentStore.this.doMergeFragments(chooseFragmentsToMerge);
                } catch (Exception e) {
                    ColumnarSegmentStore.logger.error("error happens when merge fragments:" + chooseFragmentsToMerge, (Throwable) e);
                }
            }
        });
    }

    protected void doMergeFragments(List<DataSegmentFragment> list) throws IOException {
        logger.info("start to merge fragments:{}", list);
        FragmentFilesMerger fragmentFilesMerger = new FragmentFilesMerger(this.parsedStreamingCubeInfo, this.dataSegmentFolder);
        FragmentsMergeResult merge = fragmentFilesMerger.merge(list);
        logger.info("finish to merge fragments, try to commit the merge result");
        commitFragmentsMerge(merge);
        fragmentFilesMerger.cleanMergeDirectory();
        this.inMerging = false;
    }

    protected List<DataSegmentFragment> chooseFragmentsToMerge(KylinConfig kylinConfig, List<DataSegmentFragment> list) {
        Collections.sort(list);
        return doChooseFragments(kylinConfig, list, true);
    }

    protected List<DataSegmentFragment> doChooseFragments(KylinConfig kylinConfig, List<DataSegmentFragment> list, boolean z) {
        ArrayList newArrayList = Lists.newArrayList();
        int size = list.size();
        int streamingMinFragmentsInSegment = kylinConfig.getStreamingMinFragmentsInSegment();
        long streamingMaxFragmentSizeInMb = kylinConfig.getStreamingMaxFragmentSizeInMb() * 1024 * 1024;
        long j = 0;
        for (int i = 0; i < size; i++) {
            DataSegmentFragment dataSegmentFragment = list.get(i);
            if (size - newArrayList.size() > streamingMinFragmentsInSegment - 1 && dataSegmentFragment.getFragmentId().getEndId() <= this.latestCheckpointFragment) {
                if (z && dataSegmentFragment.isMergedFragment()) {
                    if (newArrayList.size() > 1) {
                        return newArrayList;
                    }
                    if (newArrayList.size() == 1) {
                        j = 0;
                        newArrayList.clear();
                    }
                } else {
                    long dataFileSize = dataSegmentFragment.getDataFileSize();
                    if (j + dataFileSize <= streamingMaxFragmentSizeInMb) {
                        j += dataFileSize;
                        newArrayList.add(dataSegmentFragment);
                    } else {
                        if (newArrayList.size() > 1) {
                            return newArrayList;
                        }
                        if (newArrayList.size() == 1) {
                            j = 0;
                            newArrayList.clear();
                        }
                    }
                }
            }
            return newArrayList;
        }
        return newArrayList;
    }

    private void commitFragmentsMerge(FragmentsMergeResult fragmentsMergeResult) throws IOException {
        this.mergeWriteLock.lock();
        try {
            fragmentsMergeResult.getOrigFragments();
            removeFragments(fragmentsMergeResult.getOrigFragments());
            DataSegmentFragment dataSegmentFragment = new DataSegmentFragment(this.baseStorePath, this.cubeName, this.segmentName, fragmentsMergeResult.getMergedFragmentId());
            FileUtils.moveFileToDirectory(fragmentsMergeResult.getMergedFragmentDataFile(), dataSegmentFragment.getFragmentFolder(), true);
            FileUtils.moveFileToDirectory(fragmentsMergeResult.getMergedFragmentMetaFile(), dataSegmentFragment.getFragmentFolder(), true);
            this.fragments.add(dataSegmentFragment);
        } finally {
            this.mergeWriteLock.unlock();
        }
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public void purge() {
        try {
            FileUtils.deleteDirectory(this.dataSegmentFolder);
            logger.info("removed segment data, cube-{} segment-{}", this.cubeName, this.segmentName);
            ColumnarStoreCache.getInstance().removeFragmentsCache(this.fragments);
            this.fragments = Lists.newCopyOnWriteArrayList();
            logger.info("removed segment cache, cube-{} segment-{}", this.cubeName, this.segmentName);
        } catch (IOException e) {
            logger.error("error happens when purge segment", (Throwable) e);
        }
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public void restoreFromCheckpoint(Object obj) {
        FragmentId parse = FragmentId.parse((String) obj);
        for (DataSegmentFragment dataSegmentFragment : getFragmentsFromFileSystem()) {
            if (dataSegmentFragment.getFragmentId().compareTo(parse) > 0) {
                dataSegmentFragment.purge();
            }
        }
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public String getSegmentName() {
        return this.segmentName;
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public StreamingCubeSegment.State getSegmentState() {
        File file = new File(this.dataSegmentFolder, STATE_FILE);
        return file.exists() ? parseStateFile(file) : StreamingCubeSegment.State.ACTIVE;
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public void setSegmentState(StreamingCubeSegment.State state) {
        File file = new File(this.dataSegmentFolder, STATE_FILE);
        FileOutputStream fileOutputStream = null;
        try {
            try {
                if (!file.exists()) {
                    file.createNewFile();
                }
                fileOutputStream = new FileOutputStream(file);
                fileOutputStream.write(Bytes.toBytes(state.name()));
                fileOutputStream.flush();
                if (fileOutputStream != null) {
                    try {
                        fileOutputStream.close();
                    } catch (IOException e) {
                        logger.error("error when close", (Throwable) e);
                    }
                }
            } catch (IOException e2) {
                throw new IllegalStorageException(e2);
            }
        } catch (Throwable th) {
            if (fileOutputStream != null) {
                try {
                    fileOutputStream.close();
                } catch (IOException e3) {
                    logger.error("error when close", (Throwable) e3);
                }
            }
            throw th;
        }
    }

    public DataSegmentFragment createNewFragment() {
        return new DataSegmentFragment(this.baseStorePath, this.cubeName, this.segmentName, new FragmentId(getLargestFragmentID() + 1));
    }

    public List<DataSegmentFragment> getAllFragments() {
        return this.fragments;
    }

    private void removeFragments(List<DataSegmentFragment> list) {
        this.fragments.removeAll(Sets.newHashSet(list));
        for (DataSegmentFragment dataSegmentFragment : list) {
            ColumnarStoreCache.getInstance().removeFragmentCache(dataSegmentFragment);
            dataSegmentFragment.purge();
        }
    }

    private int getLargestFragmentID() {
        int i = 0;
        Iterator<DataSegmentFragment> it = getFragmentsFromFileSystem().iterator();
        while (it.hasNext()) {
            int endId = it.next().getFragmentId().getEndId();
            if (endId > i) {
                i = endId;
            }
        }
        return i;
    }

    private List<DataSegmentFragment> getFragmentsFromFileSystem() {
        ArrayList newArrayList = Lists.newArrayList();
        File[] listFiles = getStorePath().listFiles(new FileFilter() { // from class: org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore.3
            @Override // java.io.FileFilter
            public boolean accept(File file) {
                if (!file.isDirectory() || file.getName().equalsIgnoreCase("_SUCCESS")) {
                    return false;
                }
                try {
                    FragmentId.parse(file.getName());
                    return true;
                } catch (Exception e) {
                    return false;
                }
            }
        });
        if (listFiles != null) {
            for (File file : listFiles) {
                newArrayList.add(new DataSegmentFragment(this.baseStorePath, this.cubeName, this.segmentName, FragmentId.parse(file.getName())));
            }
        }
        return newArrayList;
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public SegmentStoreStats getStoreStats() {
        SegmentStoreStats segmentStoreStats = new SegmentStoreStats();
        segmentStoreStats.setNumRowsInMem(this.activeMemoryStore.getRowCount());
        segmentStoreStats.setNumFragments(getFragmentsFromFileSystem().size());
        return segmentStoreStats;
    }

    public SegmentMemoryStore getActiveMemoryStore() {
        return this.activeMemoryStore;
    }

    @Override // org.apache.kylin.stream.core.query.IStreamingGTSearcher
    public void search(StreamingSearchContext streamingSearchContext, ResultCollector resultCollector) throws IOException {
        this.mergeReadLock.lock();
        resultCollector.addCloseListener(new ResultCollector.CloseListener() { // from class: org.apache.kylin.stream.core.storage.columnar.ColumnarSegmentStore.4
            @Override // org.apache.kylin.stream.core.query.ResultCollector.CloseListener
            public void onClose() {
                ColumnarSegmentStore.this.mergeReadLock.unlock();
            }
        });
        this.persistReadLock.lock();
        try {
            List<DataSegmentFragment> allFragments = getAllFragments();
            SegmentMemoryStore segmentMemoryStore = this.persisting ? this.persistingMemoryStore : this.activeMemoryStore;
            new ColumnarSegmentStoreFilesSearcher(this.segmentName, allFragments).search(streamingSearchContext, resultCollector);
            segmentMemoryStore.search(streamingSearchContext, resultCollector);
        } finally {
            this.persistReadLock.unlock();
        }
    }

    @Override // org.apache.kylin.stream.core.storage.IStreamingSegmentStore
    public void close() throws IOException {
        logger.warn("closing the streaming cube segment, cube {}, segment {}.", this.cubeName, this.segmentName);
        StreamingMetrics.getInstance().getMetricRegistry().remove(MetricRegistry.name("streaming.inMem.row.cnt", this.cubeName, this.segmentName));
    }

    private StreamingCubeSegment.State parseStateFile(File file) {
        StreamingCubeSegment.State state = StreamingCubeSegment.State.ACTIVE;
        try {
            state = StreamingCubeSegment.State.valueOf(Files.toString(file, Charsets.UTF_8).trim());
        } catch (IOException e) {
            logger.error("error when parse state file", (Throwable) e);
        }
        return state;
    }
}
