package org.apache.druid.segment.incremental;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.JavaScriptAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregator;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.class */
public class IncrementalIndexIngestionTest extends InitializedNullHandlingTest {
    private static final int MAX_ROWS = 100000;
    public final IncrementalIndexCreator indexCreator;

    @Rule
    public final CloserRule closer = new CloserRule(false);

    public IncrementalIndexIngestionTest(String str) throws JsonProcessingException {
        this.indexCreator = (IncrementalIndexCreator) this.closer.closeLater(new IncrementalIndexCreator(str, (appendableIndexBuilder, objArr) -> {
            return appendableIndexBuilder.setIndexSchema((IncrementalIndexSchema) objArr[0]).setMaxRowCount(100000).build();
        }));
    }

    @Parameterized.Parameters(name = "{index}: {0}")
    public static Collection<?> constructorFeeder() {
        return IncrementalIndexCreator.getAppendableIndexTypes();
    }

    @Test
    public void testMultithreadAddFacts() throws Exception {
        final IncrementalIndex<?> createIndex = this.indexCreator.createIndex(new IncrementalIndexSchema.Builder().withQueryGranularity(Granularities.MINUTE).withMetrics(new LongMaxAggregatorFactory("max", "max")).build());
        Thread[] threadArr = new Thread[2];
        for (int i = 0; i < 2; i++) {
            threadArr[i] = new Thread(new Runnable() { // from class: org.apache.druid.segment.incremental.IncrementalIndexIngestionTest.1
                @Override // java.lang.Runnable
                public void run() {
                    ThreadLocalRandom current = ThreadLocalRandom.current();
                    for (int i2 = 0; i2 < 50000; i2++) {
                        try {
                            createIndex.add(new MapBasedInputRow(0L, (List<String>) Collections.singletonList("billy"), ImmutableMap.of("billy", (int) Long.valueOf(current.nextLong()), "max", 1)));
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            });
            threadArr[i].start();
        }
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.druid.segment.incremental.IncrementalIndexIngestionTest.2
            @Override // java.lang.Runnable
            public void run() {
                while (!Thread.interrupted()) {
                    Iterator<IncrementalIndexRow> it2 = createIndex.getFacts().keySet().iterator();
                    while (it2.hasNext()) {
                        if (createIndex.getMetricLongValue(it2.next().getRowIndex(), 0) != 1) {
                            atomicInteger.addAndGet(1);
                        }
                    }
                }
            }
        });
        thread.start();
        for (int i2 = 0; i2 < 2; i2++) {
            threadArr[i2].join();
        }
        thread.interrupt();
        Assert.assertEquals(0L, atomicInteger.get());
    }

    @Test
    public void testMultithreadAddFactsUsingExpressionAndJavaScript() throws Exception {
        final IncrementalIndex<?> createIndex = this.indexCreator.createIndex(new IncrementalIndexSchema.Builder().withQueryGranularity(Granularities.MINUTE).withMetrics(new LongSumAggregatorFactory("oddnum", null, "if(value%2==1,1,0)", TestExprMacroTable.INSTANCE)).withRollup(true).build());
        final IncrementalIndex<?> createIndex2 = this.indexCreator.createIndex(new IncrementalIndexSchema.Builder().withQueryGranularity(Granularities.MINUTE).withMetrics(new JavaScriptAggregatorFactory("oddnum", ImmutableList.of("value"), "function(current, value) { if (value%2==1) current = current + 1; return current;}", "function() {return 0;}", "function(a, b) { return a + b;}", JavaScriptConfig.getEnabledInstance())).withRollup(true).build());
        Thread[] threadArr = new Thread[2];
        for (int i = 0; i < 2; i++) {
            threadArr[i] = new Thread(new Runnable() { // from class: org.apache.druid.segment.incremental.IncrementalIndexIngestionTest.3
                @Override // java.lang.Runnable
                public void run() {
                    ThreadLocalRandom current = ThreadLocalRandom.current();
                    for (int i2 = 0; i2 < 50000; i2++) {
                        try {
                            int nextInt = current.nextInt(100000);
                            MapBasedInputRow mapBasedInputRow = new MapBasedInputRow(0L, (List<String>) Collections.singletonList("billy"), ImmutableMap.of("billy", Integer.valueOf(nextInt % 3), "value", Integer.valueOf(nextInt)));
                            MapBasedInputRow mapBasedInputRow2 = new MapBasedInputRow(0L, (List<String>) Collections.singletonList("billy"), ImmutableMap.of("billy", Integer.valueOf(nextInt % 3), "value", Integer.valueOf(nextInt)));
                            createIndex.add(mapBasedInputRow);
                            createIndex2.add(mapBasedInputRow2);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            });
            threadArr[i].start();
        }
        for (int i2 = 0; i2 < 2; i2++) {
            threadArr[i2].join();
        }
        long j = 0;
        long j2 = 0;
        Iterator<IncrementalIndexRow> it2 = createIndex.getFacts().keySet().iterator();
        while (it2.hasNext()) {
            j += createIndex.getMetricLongValue(it2.next().getRowIndex(), 0);
        }
        Iterator<IncrementalIndexRow> it3 = createIndex2.getFacts().keySet().iterator();
        while (it3.hasNext()) {
            j2 += createIndex2.getMetricLongValue(it3.next().getRowIndex(), 0);
        }
        Assert.assertEquals(j, j2);
    }

    @Test
    public void testOnHeapIncrementalIndexClose() throws Exception {
        Aggregator aggregator = (Aggregator) EasyMock.createMock(LongMaxAggregator.class);
        aggregator.close();
        EasyMock.expectLastCall().times(1);
        IncrementalIndex<?> createIndex = this.indexCreator.createIndex(new IncrementalIndexSchema.Builder().withQueryGranularity(Granularities.MINUTE).withMetrics(new LongMaxAggregatorFactory("max", "max")).build());
        if (createIndex instanceof OnheapIncrementalIndex) {
            OnheapIncrementalIndex onheapIncrementalIndex = (OnheapIncrementalIndex) createIndex;
            onheapIncrementalIndex.add(new MapBasedInputRow(0L, (List<String>) Collections.singletonList("billy"), ImmutableMap.of("billy", 1, "max", 1)));
            onheapIncrementalIndex.concurrentGet(0)[0] = aggregator;
            EasyMock.replay(aggregator);
            onheapIncrementalIndex.close();
            EasyMock.verify(aggregator);
        }
    }
}
