package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.DataSegmentsWithSchemas;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.class */
public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest {

    @Rule
    public ExpectedException expectedException;
    private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
    private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
    private final LockGranularity lockGranularity;
    private final boolean useInputFormatApi;
    private File inputDir;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest$SettableSplittableLocalInputSource.class */
    public static class SettableSplittableLocalInputSource extends LocalInputSource {
        private final boolean splittableInputSource;

        @JsonCreator
        private SettableSplittableLocalInputSource(@JsonProperty("baseDir") File file, @JsonProperty("filter") String str, @JsonProperty("splittableInputSource") boolean z) {
            super(file, str);
            this.splittableInputSource = z;
        }

        @JsonProperty
        public boolean isSplittableInputSource() {
            return this.splittableInputSource;
        }

        public boolean isSplittable() {
            return this.splittableInputSource;
        }
    }

    @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
    public static Iterable<Object[]> constructorFeeder() {
        return ImmutableList.of(new Object[]{LockGranularity.TIME_CHUNK, false}, new Object[]{LockGranularity.TIME_CHUNK, true}, new Object[]{LockGranularity.SEGMENT, true});
    }

    public SinglePhaseParallelIndexingTest(LockGranularity lockGranularity, boolean z) {
        super(0.2d, 0.2d);
        this.expectedException = ExpectedException.none();
        this.lockGranularity = lockGranularity;
        this.useInputFormatApi = z;
    }

    @Before
    public void setup() throws IOException {
        BufferedWriter newBufferedWriter;
        this.inputDir = this.temporaryFolder.newFolder("data");
        for (int i = 0; i < 5; i++) {
            newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            try {
                newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(24 + i), Integer.valueOf(i)}));
                newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(25 + i), Integer.valueOf(i)}));
                if (i == 0) {
                    newBufferedWriter.write(StringUtils.format("2012-12-%d,%d th test file\n", new Object[]{Integer.valueOf(25 + i), Integer.valueOf(i)}));
                    newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file,badval\n", new Object[]{Integer.valueOf(25 + i), Integer.valueOf(i)}));
                    newBufferedWriter.write(StringUtils.format("2017unparseable\n", new Object[0]));
                }
                if (newBufferedWriter != null) {
                    newBufferedWriter.close();
                }
            } finally {
            }
        }
        for (int i2 = 0; i2 < 5; i2++) {
            newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "filtered_" + i2).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            try {
                newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(25 + i2), Integer.valueOf(i2)}));
                if (newBufferedWriter != null) {
                    newBufferedWriter.close();
                }
            } finally {
            }
        }
        getObjectMapper().registerSubtypes(new Class[]{SettableSplittableLocalInputSource.class});
    }

    @After
    public void teardown() {
        this.temporaryFolder.delete();
    }

    @Test
    public void testIsReady() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(INTERVAL_TO_INDEX, true);
        IngestionTestBase.TestLocalTaskActionClient createActionClient = createActionClient(newTask);
        TaskToolbox createTaskToolbox = createTaskToolbox(newTask, createActionClient);
        prepareTaskForLocking(newTask);
        Assert.assertTrue(newTask.isReady(createActionClient));
        Iterator subTaskSpecIterator = newTask.createSinglePhaseTaskRunner(createTaskToolbox).subTaskSpecIterator();
        while (subTaskSpecIterator.hasNext()) {
            SinglePhaseSubTaskSpec singlePhaseSubTaskSpec = (SinglePhaseSubTaskSpec) subTaskSpecIterator.next();
            SinglePhaseSubTask singlePhaseSubTask = new SinglePhaseSubTask((String) null, singlePhaseSubTaskSpec.getGroupId(), (TaskResource) null, singlePhaseSubTaskSpec.getSupervisorTaskId(), singlePhaseSubTaskSpec.getId(), 0, singlePhaseSubTaskSpec.getIngestionSpec(), singlePhaseSubTaskSpec.getContext());
            IngestionTestBase.TestLocalTaskActionClient createActionClient2 = createActionClient(singlePhaseSubTask);
            prepareTaskForLocking(singlePhaseSubTask);
            Assert.assertTrue(singlePhaseSubTask.isReady(createActionClient2));
            Assert.assertEquals(Collections.singleton(new ResourceAction(new Resource("local", "EXTERNAL"), Action.READ)), singlePhaseSubTask.getInputSourceResources());
        }
    }

    private ParallelIndexSupervisorTask runTestTask(@Nullable Interval interval, Granularity granularity, boolean z, Collection<DataSegment> collection) {
        ParallelIndexSupervisorTask newTask = newTask(interval, granularity, z, true);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
        assertShardSpec(newTask, interval == null ? LockGranularity.TIME_CHUNK : this.lockGranularity, z, collection);
        return getIndexingServiceClient().getTaskContainer(newTask.getId()).getTask();
    }

    private ParallelIndexSupervisorTask runOverwriteTask(@Nullable Interval interval, Granularity granularity, LockGranularity lockGranularity) {
        ParallelIndexSupervisorTask newTask = newTask(interval, granularity, false, true);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
        assertShardSpecAfterOverwrite(newTask, lockGranularity);
        return getIndexingServiceClient().getTaskContainer(newTask.getId()).getTask();
    }

    private void testRunAndOverwrite(@Nullable Interval interval, Granularity granularity) {
        LockGranularity lockGranularity;
        runTestTask(interval, Granularities.DAY, false, Collections.emptyList());
        HashSet hashSet = new HashSet(interval == null ? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE) : getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE));
        if (interval == null) {
            lockGranularity = LockGranularity.TIME_CHUNK;
        } else {
            lockGranularity = granularity.equals(Granularities.DAY) ? this.lockGranularity : LockGranularity.TIME_CHUNK;
        }
        runOverwriteTask(interval, granularity, lockGranularity);
        Set retrieveAllUsedSegments = interval == null ? getStorageCoordinator().retrieveAllUsedSegments("dataSource", Segments.ONLY_VISIBLE) : getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval, Segments.ONLY_VISIBLE);
        Assert.assertFalse(retrieveAllUsedSegments.isEmpty());
        hashSet.addAll(retrieveAllUsedSegments);
        Assert.assertEquals(new HashSet(retrieveAllUsedSegments), SegmentTimeline.forSegments(hashSet).findNonOvershadowedObjectsInInterval(interval == null ? Intervals.ETERNITY : interval, Partitions.ONLY_COMPLETE));
    }

    private void assertShardSpec(ParallelIndexSupervisorTask parallelIndexSupervisorTask, LockGranularity lockGranularity, boolean z, Collection<DataSegment> collection) {
        DataSegmentsWithSchemas segmentAndSchemas = getIndexingServiceClient().getSegmentAndSchemas(parallelIndexSupervisorTask);
        verifySchema(segmentAndSchemas);
        Set<DataSegment> segments = segmentAndSchemas.getSegments();
        if (z || lockGranularity != LockGranularity.TIME_CHUNK) {
            Map groupSegmentsByInterval = SegmentUtils.groupSegmentsByInterval(collection);
            for (DataSegment dataSegment : segments) {
                Assert.assertSame(NumberedShardSpec.class, dataSegment.getShardSpec().getClass());
                NumberedShardSpec shardSpec = dataSegment.getShardSpec();
                List list = (List) groupSegmentsByInterval.get(dataSegment.getInterval());
                Assert.assertEquals((list == null || list.isEmpty()) ? 0 : ((DataSegment) list.get(0)).getShardSpec().getNumCorePartitions(), shardSpec.getNumCorePartitions());
            }
            return;
        }
        Iterator it = SegmentUtils.groupSegmentsByInterval(segments).values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((List) it.next()).iterator();
            while (it2.hasNext()) {
                Assert.assertSame(NumberedShardSpec.class, ((DataSegment) it2.next()).getShardSpec().getClass());
                Assert.assertEquals(r0.size(), r0.getShardSpec().getNumCorePartitions());
            }
        }
    }

    private void assertShardSpecAfterOverwrite(ParallelIndexSupervisorTask parallelIndexSupervisorTask, LockGranularity lockGranularity) {
        DataSegmentsWithSchemas segmentAndSchemas = getIndexingServiceClient().getSegmentAndSchemas(parallelIndexSupervisorTask);
        verifySchema(segmentAndSchemas);
        Map groupSegmentsByInterval = SegmentUtils.groupSegmentsByInterval(segmentAndSchemas.getSegments());
        if (lockGranularity != LockGranularity.SEGMENT) {
            Iterator it = groupSegmentsByInterval.values().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((List) it.next()).iterator();
                while (it2.hasNext()) {
                    Assert.assertSame(NumberedShardSpec.class, ((DataSegment) it2.next()).getShardSpec().getClass());
                    Assert.assertEquals(r0.size(), r0.getShardSpec().getNumCorePartitions());
                }
            }
            return;
        }
        Iterator it3 = groupSegmentsByInterval.values().iterator();
        while (it3.hasNext()) {
            Iterator it4 = ((List) it3.next()).iterator();
            while (it4.hasNext()) {
                Assert.assertSame(NumberedOverwriteShardSpec.class, ((DataSegment) it4.next()).getShardSpec().getClass());
                Assert.assertEquals(r0.size(), r0.getShardSpec().getAtomicUpdateGroupSize());
            }
        }
    }

    @Test
    public void testWithoutInterval() {
        testRunAndOverwrite(null, Granularities.DAY);
    }

    @Test
    public void testRunInParallel() {
        testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
    }

    @Test
    public void testGetRunningTaskReports() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017-12/P1M"), Granularities.DAY, false, true);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        newTask.addToContext("disableInject", true);
        getIndexingServiceClient().keepTasksRunning();
        getIndexingServiceClient().runTask(newTask.getId(), newTask);
        Thread.sleep(2000L);
        Assert.assertTrue(((IngestionStatsAndErrorsTaskReport) newTask.doGetLiveReports(true).get("ingestionStatsAndErrors")).getPayload().getRowStats().containsKey("totals"));
        getIndexingServiceClient().allowTasksToFinish();
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().waitToFinish(newTask, 2L, TimeUnit.MINUTES).getStatusCode());
    }

    @Test
    public void testRunInParallelIngestNullColumn() {
        if (this.useInputFormatApi) {
            List defaultSchemas = DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim", "dim"));
            ParallelIndexSupervisorTask parallelIndexSupervisorTask = new ParallelIndexSupervisorTask((String) null, (String) null, (TaskResource) null, new ParallelIndexIngestionSpec(DataSchema.builder().withDataSource("dataSource").withTimestamp(DEFAULT_TIMESTAMP_SPEC).withDimensions(DEFAULT_DIMENSIONS_SPEC.withDimensions(defaultSchemas)).withAggregators(new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}).withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, Collections.singletonList(Intervals.of("2017-12/P1M")))).build(), new ParallelIndexIOConfig(new SettableSplittableLocalInputSource(this.inputDir, VALID_INPUT_SOURCE_FILTER, true), DEFAULT_INPUT_FORMAT, false, (Boolean) null), DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING), (Map) null);
            parallelIndexSupervisorTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
            Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(parallelIndexSupervisorTask).getStatusCode());
            DataSegmentsWithSchemas segmentAndSchemas = getIndexingServiceClient().getSegmentAndSchemas(parallelIndexSupervisorTask);
            verifySchema(segmentAndSchemas);
            for (DataSegment dataSegment : segmentAndSchemas.getSegments()) {
                for (int i = 0; i < defaultSchemas.size(); i++) {
                    Assert.assertEquals(((DimensionSchema) defaultSchemas.get(i)).getName(), dataSegment.getDimensions().get(i));
                }
            }
        }
    }

    @Test
    public void testRunInParallelIngestNullColumn_storeEmptyColumnsOff_shouldNotStoreEmptyColumns() {
        if (this.useInputFormatApi) {
            ParallelIndexSupervisorTask parallelIndexSupervisorTask = new ParallelIndexSupervisorTask((String) null, (String) null, (TaskResource) null, new ParallelIndexIngestionSpec(DataSchema.builder().withDataSource("dataSource").withTimestamp(DEFAULT_TIMESTAMP_SPEC).withDimensions(DEFAULT_DIMENSIONS_SPEC.withDimensions(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "unknownDim", "dim")))).withAggregators(new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}).withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, Collections.singletonList(Intervals.of("2017-12/P1M")))).build(), new ParallelIndexIOConfig(new SettableSplittableLocalInputSource(this.inputDir, VALID_INPUT_SOURCE_FILTER, true), DEFAULT_INPUT_FORMAT, false, (Boolean) null), DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING), (Map) null);
            parallelIndexSupervisorTask.addToContext("storeEmptyColumns", false);
            parallelIndexSupervisorTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
            Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(parallelIndexSupervisorTask).getStatusCode());
            DataSegmentsWithSchemas segmentAndSchemas = getIndexingServiceClient().getSegmentAndSchemas(parallelIndexSupervisorTask);
            verifySchema(segmentAndSchemas);
            Iterator it = segmentAndSchemas.getSegments().iterator();
            while (it.hasNext()) {
                Assert.assertFalse(((DataSegment) it.next()).getDimensions().contains("unknownDim"));
            }
        }
    }

    @Test
    public void testRunInParallelTaskReports() {
        ParallelIndexSupervisorTask runTestTask = runTestTask(Intervals.of("2017-12/P1M"), Granularities.DAY, false, Collections.emptyList());
        compareTaskReports(buildExpectedTaskReportParallel(runTestTask.getId(), ImmutableList.of(new ParseExceptionReport("{ts=2017unparseable}", "unparseable", ImmutableList.of(getErrorMessageForUnparseableTimestamp()), 1L), new ParseExceptionReport("{ts=2017-12-25, dim=0 th test file, val=badval}", "processedWithError", ImmutableList.of("Unable to parse value[badval] for field[val]"), 1L)), new RowIngestionMetersTotals(10L, 335L, 1L, 1L, 1L)), runTestTask.doGetLiveReports(true));
    }

    @Test
    public void testWithoutIntervalWithDifferentSegmentGranularity() {
        testRunAndOverwrite(null, Granularities.MONTH);
    }

    @Test
    public void testRunInParallelWithDifferentSegmentGranularity() {
        testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.MONTH);
    }

    @Test
    public void testRunInSequential() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017-12/P1M"), false);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
        assertShardSpec(newTask, this.lockGranularity, false, Collections.emptyList());
        TaskReport.ReportMap doGetLiveReports = getIndexingServiceClient().getTaskContainer(newTask.getId()).getTask().doGetLiveReports(true);
        RowIngestionMetersTotals rowIngestionMetersTotals = new RowIngestionMetersTotals(10L, 335L, 1L, 1L, 1L);
        ImmutableList of = ImmutableList.of(new ParseExceptionReport("{ts=2017unparseable}", "unparseable", ImmutableList.of(getErrorMessageForUnparseableTimestamp()), 1L), new ParseExceptionReport("{ts=2017-12-25, dim=0 th test file, val=badval}", "processedWithError", ImmutableList.of("Unable to parse value[badval] for field[val]"), 1L));
        compareTaskReports(this.useInputFormatApi ? buildExpectedTaskReportSequential(newTask.getId(), of, new RowIngestionMetersTotals(0L, 0L, 0L, 0L, 0L), rowIngestionMetersTotals) : buildExpectedTaskReportParallel(newTask.getId(), of, rowIngestionMetersTotals), doGetLiveReports);
    }

    @Test
    public void testPublishEmptySegments() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2020-12/P1M"), true);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
    }

    @Test
    public void testWith1MaxNumConcurrentSubTasks() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017-12/P1M"), Granularities.DAY, false, true, TuningConfigBuilder.forParallelIndexTask().withMaxNumConcurrentSubTasks(1).build(), VALID_INPUT_SOURCE_FILTER);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
        Assert.assertNull("Runner must be null if the task was in the sequential mode", newTask.getCurrentRunner());
        assertShardSpec(newTask, this.lockGranularity, false, Collections.emptyList());
    }

    @Test
    public void testAppendToExisting() {
        Interval of = Intervals.of("2017-12/P1M");
        runTestTask(of, Granularities.DAY, true, Collections.emptyList());
        Set retrieveUsedSegmentsForInterval = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", of, Segments.ONLY_VISIBLE);
        runTestTask(of, Granularities.DAY, true, retrieveUsedSegmentsForInterval);
        Set retrieveUsedSegmentsForInterval2 = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", of, Segments.ONLY_VISIBLE);
        Assert.assertTrue(retrieveUsedSegmentsForInterval2.containsAll(retrieveUsedSegmentsForInterval));
        Assert.assertEquals(new HashSet(retrieveUsedSegmentsForInterval2), SegmentTimeline.forSegments(retrieveUsedSegmentsForInterval2).findNonOvershadowedObjectsInInterval(of, Partitions.ONLY_COMPLETE));
    }

    @Test
    public void testMultipleAppends() {
        ParallelIndexSupervisorTask newTask = newTask(null, Granularities.DAY, true, true);
        ParallelIndexSupervisorTask newTask2 = newTask(null, Granularities.DAY, true, true);
        newTask.addToContext("forceTimeChunkLock", true);
        newTask.addToContext("useSharedLock", true);
        newTask2.addToContext("forceTimeChunkLock", true);
        newTask2.addToContext("useSharedLock", true);
        getIndexingServiceClient().runTask(newTask.getId(), newTask);
        getIndexingServiceClient().runTask(newTask2.getId(), newTask2);
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().waitToFinish(newTask, 1L, TimeUnit.DAYS).getStatusCode());
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().waitToFinish(newTask2, 1L, TimeUnit.DAYS).getStatusCode());
    }

    @Test
    public void testRunParallelWithNoInputSplitToProcess() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017-12/P1M"), Granularities.DAY, true, true, AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING, "non_existing_file_filter");
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
    }

    @Test
    public void testOverwriteAndAppend() {
        Interval of = Intervals.of("2017-12/P1M");
        testRunAndOverwrite(of, Granularities.DAY);
        Set retrieveUsedSegmentsForInterval = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", of, Segments.ONLY_VISIBLE);
        runTestTask(of, Granularities.DAY, true, retrieveUsedSegmentsForInterval);
        Set retrieveUsedSegmentsForInterval2 = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", of, Segments.ONLY_VISIBLE);
        Assert.assertTrue(retrieveUsedSegmentsForInterval2.containsAll(retrieveUsedSegmentsForInterval));
        Assert.assertEquals(new HashSet(retrieveUsedSegmentsForInterval2), SegmentTimeline.forSegments(retrieveUsedSegmentsForInterval2).findNonOvershadowedObjectsInInterval(of, Partitions.ONLY_COMPLETE));
    }

    @Test
    public void testMaxLocksWith1MaxNumConcurrentSubTasks() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017-12/P1M"), Granularities.DAY, false, true, TuningConfigBuilder.forParallelIndexTask().withMaxNumConcurrentSubTasks(1).withMaxAllowedLockCount(0).build(), VALID_INPUT_SOURCE_FILTER);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        if (this.lockGranularity.equals(LockGranularity.TIME_CHUNK)) {
            this.expectedException.expect(RuntimeException.class);
            this.expectedException.expectMessage("Number of locks exceeded maxAllowedLockCount [0]");
            getIndexingServiceClient().runAndWait(newTask);
        } else {
            Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
            Assert.assertNull("Runner must be null if the task was in the sequential mode", newTask.getCurrentRunner());
            assertShardSpec(newTask, this.lockGranularity, false, Collections.emptyList());
        }
    }

    @Test
    public void testMaxLocksWith2MaxNumConcurrentSubTasks() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017-12/P1M"), Granularities.DAY, false, true, TuningConfigBuilder.forParallelIndexTask().withMaxNumConcurrentSubTasks(2).withMaxAllowedLockCount(0).build(), VALID_INPUT_SOURCE_FILTER);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        if (this.lockGranularity.equals(LockGranularity.TIME_CHUNK)) {
            this.expectedException.expect(RuntimeException.class);
            this.expectedException.expectMessage("Number of locks exceeded maxAllowedLockCount [0]");
            getIndexingServiceClient().runAndWait(newTask);
        } else {
            Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
            Assert.assertNull("Runner must be null if the task was in the sequential mode", newTask.getCurrentRunner());
            assertShardSpec(newTask, this.lockGranularity, false, Collections.emptyList());
        }
    }

    @Test
    public void testIngestBothExplicitAndImplicitDims() throws IOException {
        Interval of = Intervals.of("2017-12/P1M");
        for (int i = 0; i < 5; i++) {
            BufferedWriter newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "test_" + i + ".json").toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            try {
                newBufferedWriter.write(getObjectMapper().writeValueAsString(ImmutableMap.of("ts", StringUtils.format("2017-12-%d", new Object[]{Integer.valueOf(24 + i)}), "implicitDim", "implicit_" + i, "explicitDim", "explicit_" + i)));
                newBufferedWriter.write(getObjectMapper().writeValueAsString(ImmutableMap.of("ts", StringUtils.format("2017-12-%d", new Object[]{Integer.valueOf(25 + i)}), "implicitDim", "implicit_" + i, "explicitDim", "explicit_" + i)));
                if (newBufferedWriter != null) {
                    newBufferedWriter.close();
                }
            } catch (Throwable th) {
                if (newBufferedWriter != null) {
                    try {
                        newBufferedWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        ParallelIndexSupervisorTask parallelIndexSupervisorTask = new ParallelIndexSupervisorTask((String) null, (String) null, (TaskResource) null, new ParallelIndexIngestionSpec(DataSchema.builder().withDataSource("dataSource").withTimestamp(DEFAULT_TIMESTAMP_SPEC).withDimensions(DimensionsSpec.builder().setDefaultSchemaDimensions(ImmutableList.of("ts", "explicitDim")).setIncludeAllDimensions(true).build()).withAggregators(new AggregatorFactory[]{new CountAggregatorFactory("cnt")}).withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, Collections.singletonList(of))).build(), new ParallelIndexIOConfig(new SettableSplittableLocalInputSource(this.inputDir, "*.json", true), new JsonInputFormat(new JSONPathSpec(true, (List) null), (Map) null, (Boolean) null, (Boolean) null, (Boolean) null), false, (Boolean) null), AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING), (Map) null);
        parallelIndexSupervisorTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(parallelIndexSupervisorTask).getStatusCode());
        DataSegmentsWithSchemas segmentAndSchemas = getIndexingServiceClient().getSegmentAndSchemas(parallelIndexSupervisorTask);
        verifySchema(segmentAndSchemas);
        Iterator it = segmentAndSchemas.getSegments().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(ImmutableList.of("ts", "explicitDim", "implicitDim"), ((DataSegment) it.next()).getDimensions());
        }
    }

    @Test
    public void testIngestBothExplicitAndImplicitDimsSchemaDiscovery() throws IOException {
        Interval of = Intervals.of("2017-12/P1M");
        for (int i = 0; i < 5; i++) {
            BufferedWriter newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "test_" + i + ".json").toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            try {
                newBufferedWriter.write(getObjectMapper().writeValueAsString(ImmutableMap.of("ts", StringUtils.format("2017-12-%d", new Object[]{Integer.valueOf(24 + i)}), "implicitDim", "implicit_" + i, "explicitDim", "explicit_" + i)));
                newBufferedWriter.write(getObjectMapper().writeValueAsString(ImmutableMap.of("ts", StringUtils.format("2017-12-%d", new Object[]{Integer.valueOf(25 + i)}), "implicitDim", "implicit_" + i, "explicitDim", "explicit_" + i)));
                if (newBufferedWriter != null) {
                    newBufferedWriter.close();
                }
            } catch (Throwable th) {
                if (newBufferedWriter != null) {
                    try {
                        newBufferedWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        ParallelIndexSupervisorTask parallelIndexSupervisorTask = new ParallelIndexSupervisorTask((String) null, (String) null, (TaskResource) null, new ParallelIndexIngestionSpec(DataSchema.builder().withDataSource("dataSource").withTimestamp(DEFAULT_TIMESTAMP_SPEC).withDimensions(DimensionsSpec.builder().setDefaultSchemaDimensions(ImmutableList.of("ts", "explicitDim")).useSchemaDiscovery(true).build()).withAggregators(new AggregatorFactory[]{new CountAggregatorFactory("cnt")}).withGranularity(new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, Collections.singletonList(of))).build(), new ParallelIndexIOConfig(new SettableSplittableLocalInputSource(this.inputDir, "*.json", true), new JsonInputFormat(new JSONPathSpec(true, (List) null), (Map) null, (Boolean) null, (Boolean) null, (Boolean) null), false, (Boolean) null), AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING), (Map) null);
        parallelIndexSupervisorTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(parallelIndexSupervisorTask).getStatusCode());
        DataSegmentsWithSchemas segmentAndSchemas = getIndexingServiceClient().getSegmentAndSchemas(parallelIndexSupervisorTask);
        verifySchema(segmentAndSchemas);
        Iterator it = segmentAndSchemas.getSegments().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(ImmutableList.of("ts", "explicitDim", "implicitDim"), ((DataSegment) it.next()).getDimensions());
        }
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, boolean z) {
        return newTask(interval, Granularities.DAY, false, z);
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, Granularity granularity, boolean z, boolean z2) {
        return newTask(interval, granularity, z, z2, AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING, VALID_INPUT_SOURCE_FILTER);
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, Granularity granularity, boolean z, boolean z2, ParallelIndexTuningConfig parallelIndexTuningConfig, String str) {
        ParallelIndexIngestionSpec parallelIndexIngestionSpec;
        if (this.useInputFormatApi) {
            parallelIndexIngestionSpec = new ParallelIndexIngestionSpec(DataSchema.builder().withDataSource("dataSource").withTimestamp(DEFAULT_TIMESTAMP_SPEC).withDimensions(DEFAULT_DIMENSIONS_SPEC).withAggregators(new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}).withGranularity(new UniformGranularitySpec(granularity, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval))).build(), new ParallelIndexIOConfig(new SettableSplittableLocalInputSource(this.inputDir, str, z2), DEFAULT_INPUT_FORMAT, Boolean.valueOf(z), (Boolean) null), parallelIndexTuningConfig);
        } else {
            parallelIndexIngestionSpec = new ParallelIndexIngestionSpec(DataSchema.builder().withDataSource("dataSource").withTimestamp(DEFAULT_TIMESTAMP_SPEC).withDimensions(DEFAULT_DIMENSIONS_SPEC).withAggregators(DEFAULT_METRICS_SPEC).withGranularity(new UniformGranularitySpec(granularity, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval))).build(), new ParallelIndexIOConfig(new LocalInputSource(this.inputDir, str), createInputFormatFromParseSpec(DEFAULT_PARSE_SPEC), Boolean.valueOf(z), (Boolean) null), parallelIndexTuningConfig);
        }
        return new ParallelIndexSupervisorTask((String) null, (String) null, (TaskResource) null, parallelIndexIngestionSpec, Collections.emptyMap());
    }

    private String getErrorMessageForUnparseableTimestamp() {
        return StringUtils.format("Timestamp[2017unparseable] is unparseable! Event: {ts=2017unparseable} (Path: %s, Record: 5, Line: 5)", new Object[]{new File(this.inputDir, "test_0").toURI()});
    }
}
