package org.apache.druid.indexing.firehose;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Module;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexing.common.ReingestionTimelineUtils;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.ExtractionFn;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.partition.NumberedPartitionChunk;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.class */
public class IngestSegmentFirehoseFactoryTest {
    private static final ObjectMapper MAPPER;
    private static final IndexMergerV9 INDEX_MERGER_V9;
    private static final IndexIO INDEX_IO;
    private static final TaskStorage TASK_STORAGE;
    private static final IndexerSQLMetadataStorageCoordinator MDC;
    private static final TaskLockbox TASK_LOCKBOX;
    private static final Task TASK;

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Logger log;
    private static final String DATA_SOURCE_NAME = "testDataSource";
    private static final String DATA_SOURCE_VERSION = "version";
    private static final Integer BINARY_VERSION;
    private static final String DIM_NAME = "testDimName";
    private static final String DIM_VALUE = "testDimValue";
    private static final String DIM_LONG_NAME = "testDimLongName";
    private static final String DIM_FLOAT_NAME = "testDimFloatName";
    private static final String METRIC_LONG_NAME = "testLongMetric";
    private static final String METRIC_FLOAT_NAME = "testFloatMetric";
    private static final Long METRIC_LONG_VALUE;
    private static final Float METRIC_FLOAT_VALUE;
    private static final String TIME_COLUMN = "ts";
    private static final Integer MAX_SHARD_NUMBER;
    private static final Integer MAX_ROWS;
    private static final File TMP_DIR;
    private static final File PERSIST_DIR;
    private static final List<DataSegment> SEGMENT_SET;
    private final FirehoseFactory<InputRowParser> factory;
    private final InputRowParser rowParser;
    private File tempDir;
    private static final InputRowParser<Map<String, Object>> ROW_PARSER;

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> constructorFeeder() throws IOException {
        IndexSpec indexSpec = new IndexSpec();
        IncrementalIndex build = new OnheapIncrementalIndex.Builder().setIndexSchema(new IncrementalIndexSchema.Builder().withMinTimestamp(-4611686018427387904L).withDimensionsSpec(ROW_PARSER).withMetrics(new AggregatorFactory[]{new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME), new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)}).build()).setMaxRowCount(MAX_ROWS.intValue() * MAX_SHARD_NUMBER.intValue()).build();
        int i = 0;
        while (true) {
            Integer num = i;
            if (num.intValue() >= MAX_ROWS.intValue()) {
                break;
            }
            build.add((InputRow) ROW_PARSER.parseBatch(buildRow(Long.valueOf(num.longValue()))).get(0));
            i = Integer.valueOf(num.intValue() + 1);
        }
        if (!PERSIST_DIR.mkdirs() && !PERSIST_DIR.exists()) {
            throw new IOE("Could not create directory at [%s]", new Object[]{PERSIST_DIR.getAbsolutePath()});
        }
        INDEX_MERGER_V9.persist(build, PERSIST_DIR, indexSpec, (SegmentWriteOutMediumFactory) null);
        CoordinatorClient coordinatorClient = new CoordinatorClient(null, null) { // from class: org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.3
            public Collection<DataSegment> fetchUsedSegmentsInDataSourceForIntervals(String str, List<Interval> list) {
                return ImmutableSet.copyOf(IngestSegmentFirehoseFactoryTest.SEGMENT_SET);
            }
        };
        EasyMock.replay(new Object[]{(SegmentHandoffNotifierFactory) EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class)});
        SegmentLoaderFactory segmentLoaderFactory = new SegmentLoaderFactory((IndexIO) null, MAPPER);
        RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
        ArrayList arrayList = new ArrayList();
        for (InputRowParser<Map<String, Object>> inputRowParser : Arrays.asList(ROW_PARSER, new MapInputRowParser(new JSONParseSpec(new TimestampSpec(TIME_COLUMN, "auto", (DateTime) null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of()), ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME), ImmutableList.of()), (JSONPathSpec) null, (Map) null, (Boolean) null)))) {
            for (List list : Arrays.asList(null, ImmutableList.of(DIM_NAME))) {
                for (List list2 : Arrays.asList(null, ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME))) {
                    for (Boolean bool : Arrays.asList(false, true)) {
                        CombiningFirehoseFactory ingestSegmentFirehoseFactory = new IngestSegmentFirehoseFactory(TASK.getDataSource(), Intervals.ETERNITY, (List) null, new SelectorDimFilter(DIM_NAME, DIM_VALUE, (ExtractionFn) null), list, list2, (Long) null, INDEX_IO, coordinatorClient, segmentLoaderFactory, retryPolicyFactory);
                        CombiningFirehoseFactory combiningFirehoseFactory = bool.booleanValue() ? new CombiningFirehoseFactory(ImmutableList.of(ingestSegmentFirehoseFactory)) : ingestSegmentFirehoseFactory;
                        Object[] objArr = new Object[3];
                        Object[] objArr2 = new Object[4];
                        objArr2[0] = list == null ? "null" : "dims";
                        objArr2[1] = list2 == null ? "null" : "metrics";
                        objArr2[2] = inputRowParser == ROW_PARSER ? "dims" : "null";
                        objArr2[3] = bool;
                        objArr[0] = StringUtils.format("DimNames[%s]MetricNames[%s]ParserDimNames[%s]WrapInCombining[%s]", objArr2);
                        objArr[1] = combiningFirehoseFactory;
                        objArr[2] = inputRowParser;
                        arrayList.add(objArr);
                    }
                }
            }
        }
        return arrayList;
    }

    public static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper) {
        objectMapper.registerModule(new SimpleModule("testModule").registerSubtypes(new Class[]{LocalLoadSpec.class}));
        GuiceAnnotationIntrospector guiceAnnotationIntrospector = new GuiceAnnotationIntrospector();
        objectMapper.setAnnotationIntrospectors(new AnnotationIntrospectorPair(guiceAnnotationIntrospector, objectMapper.getSerializationConfig().getAnnotationIntrospector()), new AnnotationIntrospectorPair(guiceAnnotationIntrospector, objectMapper.getDeserializationConfig().getAnnotationIntrospector()));
        objectMapper.setInjectableValues(new GuiceInjectableValues(GuiceInjectors.makeStartupInjectorWithModules(ImmutableList.of(new Module() { // from class: org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.4
            public void configure(Binder binder) {
                binder.bind(LocalDataSegmentPuller.class);
                binder.bind(ExprMacroTable.class).toInstance(TestExprMacroTable.INSTANCE);
            }
        }))));
        return objectMapper;
    }

    public IngestSegmentFirehoseFactoryTest(String str, FirehoseFactory firehoseFactory, InputRowParser inputRowParser) {
        this.factory = firehoseFactory;
        this.rowParser = TransformSpec.NONE.decorate(inputRowParser);
    }

    private static Map<String, Object> buildRow(Long l) {
        return ImmutableMap.of(TIME_COLUMN, l, DIM_NAME, DIM_VALUE, DIM_FLOAT_NAME, METRIC_FLOAT_VALUE, DIM_LONG_NAME, METRIC_LONG_VALUE);
    }

    private static DataSegment buildSegment(Integer num) {
        Preconditions.checkArgument(num.intValue() < MAX_SHARD_NUMBER.intValue());
        Preconditions.checkArgument(num.intValue() >= 0);
        return new DataSegment(DATA_SOURCE_NAME, Intervals.ETERNITY, DATA_SOURCE_VERSION, ImmutableMap.of("type", "local", "path", PERSIST_DIR.getAbsolutePath()), ImmutableList.of(DIM_NAME), ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), new NumberedShardSpec(num.intValue(), MAX_SHARD_NUMBER.intValue()), BINARY_VERSION, 0L);
    }

    @BeforeClass
    public static void setUpStatic() {
        for (int i = 0; i < MAX_SHARD_NUMBER.intValue(); i++) {
            SEGMENT_SET.add(buildSegment(Integer.valueOf(i)));
        }
    }

    @AfterClass
    public static void tearDownStatic() {
        recursivelyDelete(TMP_DIR);
    }

    private static void recursivelyDelete(File file) {
        if (file != null) {
            if (!file.isDirectory()) {
                if (file.delete()) {
                    return;
                }
                log.warn("Could not delete file at [%s]", new Object[]{file.getAbsolutePath()});
                return;
            }
            File[] listFiles = file.listFiles();
            if (listFiles != null) {
                for (File file2 : listFiles) {
                    recursivelyDelete(file2);
                }
            }
        }
    }

    @Before
    public void setup() throws IOException {
        this.tempDir = this.temporaryFolder.newFolder();
    }

    @After
    public void teardown() {
        this.tempDir.delete();
    }

    @Test
    public void sanityTest() {
        if (this.factory instanceof CombiningFirehoseFactory) {
            return;
        }
        IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = this.factory;
        Assert.assertEquals(TASK.getDataSource(), ingestSegmentFirehoseFactory.getDataSource());
        if (ingestSegmentFirehoseFactory.getDimensions() != null) {
            Assert.assertArrayEquals(new String[]{DIM_NAME}, ingestSegmentFirehoseFactory.getDimensions().toArray());
        }
        Assert.assertEquals(Intervals.ETERNITY, ingestSegmentFirehoseFactory.getInterval());
        if (ingestSegmentFirehoseFactory.getMetrics() != null) {
            Assert.assertEquals(ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME), ImmutableSet.copyOf(ingestSegmentFirehoseFactory.getMetrics()));
        }
    }

    @Test
    public void simpleFirehoseReadingTest() throws IOException {
        Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), SEGMENT_SET.size());
        Integer num = 0;
        Firehose connect = this.factory.connect(this.rowParser, TMP_DIR);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    InputRow nextRow = connect.nextRow();
                    Assert.assertArrayEquals(new String[]{DIM_NAME}, nextRow.getDimensions().toArray());
                    Assert.assertArrayEquals(new String[]{DIM_VALUE}, nextRow.getDimension(DIM_NAME).toArray());
                    Assert.assertEquals(Long.valueOf(METRIC_LONG_VALUE.longValue()), nextRow.getMetric(METRIC_LONG_NAME));
                    Assert.assertEquals(METRIC_FLOAT_VALUE.floatValue(), nextRow.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE.floatValue() * 1.0E-4d);
                    num = Integer.valueOf(num.intValue() + 1);
                } finally {
                }
            } catch (Throwable th2) {
                if (connect != null) {
                    if (th != null) {
                        try {
                            connect.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        connect.close();
                    }
                }
                throw th2;
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        Assert.assertEquals(MAX_SHARD_NUMBER.intValue() * MAX_ROWS.intValue(), num.intValue());
    }

    @Test
    public void testTransformSpec() throws IOException {
        Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), SEGMENT_SET.size());
        Integer num = 0;
        int i = 0;
        Firehose connect = this.factory.connect(new TransformSpec(new SelectorDimFilter("__time", "1", (ExtractionFn) null), ImmutableList.of(new ExpressionTransform(METRIC_FLOAT_NAME, "testFloatMetric * 10", ExprMacroTable.nil()))).decorate(this.rowParser), TMP_DIR);
        Throwable th = null;
        while (connect.hasMore()) {
            try {
                try {
                    InputRow nextRow = connect.nextRow();
                    if (nextRow == null) {
                        i++;
                    } else {
                        Assert.assertArrayEquals(new String[]{DIM_NAME}, nextRow.getDimensions().toArray());
                        Assert.assertArrayEquals(new String[]{DIM_VALUE}, nextRow.getDimension(DIM_NAME).toArray());
                        Assert.assertEquals(METRIC_LONG_VALUE.longValue(), nextRow.getMetric(METRIC_LONG_NAME).longValue());
                        Assert.assertEquals(METRIC_FLOAT_VALUE.floatValue() * 10.0f, nextRow.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE.floatValue() * 1.0E-4d);
                        num = Integer.valueOf(num.intValue() + 1);
                    }
                } catch (Throwable th2) {
                    if (connect != null) {
                        if (th != null) {
                            try {
                                connect.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            connect.close();
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        }
        if (connect != null) {
            if (0 != 0) {
                try {
                    connect.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                connect.close();
            }
        }
        Assert.assertEquals(90L, i);
        Assert.assertEquals(MAX_ROWS.intValue(), num.intValue());
    }

    @Test
    public void testGetUniqueDimensionsAndMetrics() {
        Interval of = Intervals.of("2017-01-01/2017-01-02");
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            ArrayList arrayList2 = new ArrayList();
            for (int i2 = 0; i2 < 5; i2++) {
                arrayList2.add(new NumberedPartitionChunk(i, 10, new DataSegment("ds", of, "1", ImmutableMap.of(), (List) IntStream.range(i, i + 5).mapToObj(i3 -> {
                    return "dim" + i3;
                }).collect(Collectors.toList()), (List) IntStream.range(i, i + 5).mapToObj(i4 -> {
                    return "met" + i4;
                }).collect(Collectors.toList()), new NumberedShardSpec(10, i), 1, 1L)));
            }
            arrayList.add(new TimelineObjectHolder(of, "1", new PartitionHolder(arrayList2)));
        }
        Assert.assertEquals(Arrays.asList("dim9", "dim10", "dim11", "dim12", "dim13", "dim8", "dim7", "dim6", "dim5", "dim4", "dim3", "dim2", "dim1", "dim0"), ReingestionTimelineUtils.getUniqueDimensions(arrayList, (Set) null));
        Assert.assertEquals(Arrays.asList("met9", "met10", "met11", "met12", "met13", "met8", "met7", "met6", "met5", "met4", "met3", "met2", "met1", "met0"), ReingestionTimelineUtils.getUniqueMetrics(arrayList));
    }

    private static ServiceEmitter newMockEmitter() {
        return new NoopServiceEmitter();
    }

    static {
        TestUtils testUtils = new TestUtils();
        MAPPER = setupInjectablesInObjectMapper(TestHelper.makeJsonMapper());
        INDEX_MERGER_V9 = testUtils.getTestIndexMergerV9();
        INDEX_IO = testUtils.getTestIndexIO();
        TASK_STORAGE = new HeapMemoryTaskStorage(new TaskStorageConfig(null) { // from class: org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.1
        });
        MDC = new IndexerSQLMetadataStorageCoordinator(null, null, null) { // from class: org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactoryTest.2
            private final Set<DataSegment> published = new HashSet();

            public List<DataSegment> retrieveUsedSegmentsForIntervals(String str, List<Interval> list, Segments segments) {
                return ImmutableList.copyOf(IngestSegmentFirehoseFactoryTest.SEGMENT_SET);
            }

            public List<DataSegment> retrieveUnusedSegmentsForInterval(String str, Interval interval) {
                return ImmutableList.of();
            }

            public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> set) {
                HashSet hashSet = new HashSet();
                for (DataSegment dataSegment : set) {
                    if (this.published.add(dataSegment)) {
                        hashSet.add(dataSegment);
                    }
                }
                return ImmutableSet.copyOf(hashSet);
            }

            public void deleteSegments(Set<DataSegment> set) {
            }

            /* renamed from: retrieveUsedSegmentsForIntervals, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Collection m69retrieveUsedSegmentsForIntervals(String str, List list, Segments segments) {
                return retrieveUsedSegmentsForIntervals(str, (List<Interval>) list, segments);
            }
        };
        TASK_LOCKBOX = new TaskLockbox(TASK_STORAGE, MDC);
        TASK = NoopTask.create();
        TASK_LOCKBOX.add(TASK);
        log = new Logger(IngestSegmentFirehoseFactoryTest.class);
        BINARY_VERSION = -1;
        METRIC_LONG_VALUE = 1L;
        METRIC_FLOAT_VALUE = Float.valueOf(1.0f);
        MAX_SHARD_NUMBER = 10;
        MAX_ROWS = 10;
        TMP_DIR = FileUtils.createTempDir();
        PERSIST_DIR = Paths.get(TMP_DIR.getAbsolutePath(), "indexTestMerger").toFile();
        SEGMENT_SET = new ArrayList(MAX_SHARD_NUMBER.intValue());
        ROW_PARSER = new MapInputRowParser(new TimeAndDimsParseSpec(new TimestampSpec(TIME_COLUMN, "auto", (DateTime) null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(DIM_NAME)), ImmutableList.of(DIM_FLOAT_NAME, DIM_LONG_NAME), ImmutableList.of())));
    }
}
