package org.apache.kylin.stream.core.storage.columnar;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.metadata.filter.CompareTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.stream.core.model.StreamingMessage;
import org.apache.kylin.stream.core.query.SingleThreadResultCollector;
import org.apache.kylin.stream.core.query.StreamingQueryProfile;
import org.apache.kylin.stream.core.query.StreamingSearchContext;
import org.apache.kylin.stream.core.storage.Record;
import org.apache.kylin.stream.core.storage.TestHelper;
import org.apache.kylin.stream.core.storage.columnar.protocol.FragmentMetaInfo;
import org.apache.log4j.PropertyConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kylin/stream/core/storage/columnar/FragmentFilesMergerTest.class */
public class FragmentFilesMergerTest extends LocalFileMetadataTestCase {
    private static final String cubeName = "test_streaming_v2_cube";
    private String baseStorePath;
    private CubeInstance cubeInstance;
    private String segmentName;
    private ParsedStreamingCubeInfo parsedStreamingCubeInfo;
    private FragmentFilesMerger fragmentFilesMerger;
    private TestHelper testHelper;

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
        this.baseStorePath = KylinConfig.getInstanceFromEnv().getStreamingIndexPath();
        this.cubeInstance = getCubeManager().reloadCubeQuietly(cubeName);
        this.segmentName = "20171218100000_20171218110000";
        this.parsedStreamingCubeInfo = new ParsedStreamingCubeInfo(this.cubeInstance);
        this.fragmentFilesMerger = new FragmentFilesMerger(this.parsedStreamingCubeInfo, new File(new File(this.baseStorePath, cubeName), this.segmentName));
        this.testHelper = new TestHelper(this.cubeInstance);
        cleanupData();
        PropertyConfigurator.configure("../build/conf/kylin-tools-log4j.properties");
    }

    @After
    public void after() throws Exception {
        cleanupTestMetadata();
        cleanupData();
    }

    @Test
    public void testMerge() throws Exception {
        FragmentsMergeResult merge = this.fragmentFilesMerger.merge(createFragmentFiles(0, 5, new StreamingDataSimulator()));
        FragmentMetaInfo fragmentMetaInfo = (FragmentMetaInfo) JsonUtil.readValue(merge.getMergedFragmentMetaFile(), FragmentMetaInfo.class);
        Assert.assertEquals(250000L, fragmentMetaInfo.getNumberOfRows());
        Assert.assertEquals(250000L, fragmentMetaInfo.getOriginNumOfRows());
        FragmentData fragmentData = new FragmentData(fragmentMetaInfo, merge.getMergedFragmentDataFile());
        Set<TblColRef> simulateDimensions = this.testHelper.simulateDimensions("STREAMING_V2_TABLE.SITE");
        CompareTupleFilter buildEQFilter = this.testHelper.buildEQFilter("SITE", "SITE0");
        HashSet newHashSet = Sets.newHashSet();
        StreamingQueryProfile.set(new StreamingQueryProfile("test-query-id", System.currentTimeMillis()));
        FragmentFileSearcher fragmentFileSearcher = new FragmentFileSearcher(new DataSegmentFragment(this.baseStorePath, cubeName, this.segmentName, new FragmentId(0)), fragmentData);
        StreamingSearchContext streamingSearchContext = new StreamingSearchContext(this.parsedStreamingCubeInfo.cubeDesc, simulateDimensions, Sets.newHashSet(), newHashSet, buildEQFilter, (TupleFilter) null);
        SingleThreadResultCollector singleThreadResultCollector = new SingleThreadResultCollector();
        fragmentFileSearcher.search(streamingSearchContext, singleThreadResultCollector);
        int i = 0;
        Iterator it = singleThreadResultCollector.iterator();
        while (it.hasNext()) {
            i++;
        }
        Assert.assertEquals(25000L, i);
        StreamingSearchContext streamingSearchContext2 = new StreamingSearchContext(this.parsedStreamingCubeInfo.cubeDesc, this.testHelper.simulateDimensions("STREAMING_V2_TABLE.SITE", "STREAMING_V2_TABLE.MINUTE_START"), this.testHelper.simulateDimensions("STREAMING_V2_TABLE.MINUTE_START"), Sets.newHashSet(new FunctionDesc[]{this.testHelper.simulateCountMetric()}), (TupleFilter) null, (TupleFilter) null);
        SingleThreadResultCollector singleThreadResultCollector2 = new SingleThreadResultCollector();
        fragmentFileSearcher.search(streamingSearchContext2, singleThreadResultCollector2);
        int i2 = 0;
        Iterator it2 = singleThreadResultCollector2.iterator();
        while (it2.hasNext()) {
            Record record = (Record) it2.next();
            System.out.println(record);
            i2 = (int) (i2 + ((Long) record.getMetrics()[0]).longValue());
        }
        Assert.assertEquals(250000L, i2);
    }

    @Test
    public void testMerge2() throws Exception {
        int i = 4 * 50000;
        FragmentsMergeResult merge = this.fragmentFilesMerger.merge(createFragmentFiles(5, 4, new StreamingDataSimulator(getTestCardinalityMap(), 100000)));
        File mergedFragmentMetaFile = merge.getMergedFragmentMetaFile();
        File mergedFragmentDataFile = merge.getMergedFragmentDataFile();
        FragmentMetaInfo fragmentMetaInfo = (FragmentMetaInfo) JsonUtil.readValue(mergedFragmentMetaFile, FragmentMetaInfo.class);
        Assert.assertEquals(160000L, fragmentMetaInfo.getNumberOfRows());
        Assert.assertEquals(i, fragmentMetaInfo.getOriginNumOfRows());
        FragmentData fragmentData = new FragmentData(fragmentMetaInfo, mergedFragmentDataFile);
        Set<TblColRef> simulateDimensions = this.testHelper.simulateDimensions("STREAMING_V2_TABLE.MINUTE_START");
        Set<TblColRef> simulateDimensions2 = this.testHelper.simulateDimensions("STREAMING_V2_TABLE.MINUTE_START");
        HashSet newHashSet = Sets.newHashSet(new FunctionDesc[]{this.testHelper.simulateCountMetric()});
        StreamingQueryProfile.set(new StreamingQueryProfile("test-query-id", System.currentTimeMillis()));
        FragmentFileSearcher fragmentFileSearcher = new FragmentFileSearcher(new DataSegmentFragment(this.baseStorePath, cubeName, this.segmentName, new FragmentId(0)), fragmentData);
        StreamingSearchContext streamingSearchContext = new StreamingSearchContext(this.parsedStreamingCubeInfo.cubeDesc, simulateDimensions, simulateDimensions2, newHashSet, (TupleFilter) null, (TupleFilter) null);
        SingleThreadResultCollector singleThreadResultCollector = new SingleThreadResultCollector();
        fragmentFileSearcher.search(streamingSearchContext, singleThreadResultCollector);
        int i2 = 0;
        int i3 = 0;
        Iterator it = singleThreadResultCollector.iterator();
        while (it.hasNext()) {
            Record record = (Record) it.next();
            i2++;
            long longValue = ((Long) record.getMetrics()[0]).longValue();
            Assert.assertEquals(100000, (int) longValue);
            System.out.println(record);
            i3 = (int) (i3 + longValue);
        }
        Assert.assertEquals(2L, i2);
        Assert.assertEquals(i, i3);
    }

    @Test
    public void testMergeWithAdditionalCuboids() throws Exception {
        setBuildAdditionalCuboids();
        FragmentsMergeResult merge = this.fragmentFilesMerger.merge(createFragmentFiles(9, 3, new StreamingDataSimulator(getTestCardinalityMap(), 200000)));
        File mergedFragmentMetaFile = merge.getMergedFragmentMetaFile();
        merge.getMergedFragmentDataFile();
        Assert.assertEquals(160010L, ((FragmentMetaInfo) JsonUtil.readValue(mergedFragmentMetaFile, FragmentMetaInfo.class)).getNumberOfRows());
    }

    private void setBuildAdditionalCuboids() {
        this.cubeInstance.getDescriptor().getConfig().getExtendedOverrides().put("kylin.stream.build.additional.cuboids", "true");
    }

    private Map<String, Integer> getTestCardinalityMap() {
        HashMap hashMap = new HashMap();
        hashMap.put("SITE", 10);
        hashMap.put("ITM", 80000);
        return hashMap;
    }

    private List<DataSegmentFragment> createFragmentFiles(int i, int i2, StreamingDataSimulator streamingDataSimulator) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i2);
        ColumnarMemoryStorePersister columnarMemoryStorePersister = new ColumnarMemoryStorePersister(this.parsedStreamingCubeInfo, this.segmentName);
        Iterator<StreamingMessage> simulate = streamingDataSimulator.simulate(50000 * i2, DateFormat.stringToMillis("2018-07-30 20:00:00"));
        for (int i3 = 0; i3 < i2; i3++) {
            DataSegmentFragment dataSegmentFragment = new DataSegmentFragment(this.baseStorePath, cubeName, this.segmentName, new FragmentId(i + i3));
            SegmentMemoryStore segmentMemoryStore = new SegmentMemoryStore(new ParsedStreamingCubeInfo(this.cubeInstance), this.segmentName);
            for (int i4 = 0; simulate.hasNext() && i4 < 50000; i4++) {
                segmentMemoryStore.index(simulate.next());
            }
            columnarMemoryStorePersister.persist(segmentMemoryStore, dataSegmentFragment);
            newArrayListWithCapacity.add(dataSegmentFragment);
        }
        return newArrayListWithCapacity;
    }

    public CubeManager getCubeManager() {
        return CubeManager.getInstance(getTestConfig());
    }

    private void cleanupData() throws IOException {
        this.fragmentFilesMerger.cleanMergeDirectory();
        FileUtils.deleteQuietly(new File(this.baseStorePath));
    }
}
