package org.apache.druid.indexing.common.task.batch.parallel;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.IngestionTestBase;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.class */
public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest {
    private static final Interval INTERVAL_TO_INDEX = Intervals.of("2017-12/P1M");
    private final LockGranularity lockGranularity;
    private final boolean useInputFormatApi;
    private File inputDir;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest$SettableSplittableLocalInputSource.class */
    public static class SettableSplittableLocalInputSource extends LocalInputSource {
        private final boolean splittableInputSource;

        @JsonCreator
        private SettableSplittableLocalInputSource(@JsonProperty("baseDir") File file, @JsonProperty("filter") String str, @JsonProperty("splittableInputSource") boolean z) {
            super(file, str);
            this.splittableInputSource = z;
        }

        @JsonProperty
        public boolean isSplittableInputSource() {
            return this.splittableInputSource;
        }

        public boolean isSplittable() {
            return this.splittableInputSource;
        }
    }

    @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
    public static Iterable<Object[]> constructorFeeder() {
        return ImmutableList.of(new Object[]{LockGranularity.TIME_CHUNK, false}, new Object[]{LockGranularity.TIME_CHUNK, true}, new Object[]{LockGranularity.SEGMENT, true});
    }

    public SinglePhaseParallelIndexingTest(LockGranularity lockGranularity, boolean z) {
        this.lockGranularity = lockGranularity;
        this.useInputFormatApi = z;
    }

    @Before
    public void setup() throws IOException {
        BufferedWriter newBufferedWriter;
        this.inputDir = this.temporaryFolder.newFolder("data");
        for (int i = 0; i < 5; i++) {
            newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            Throwable th = null;
            try {
                try {
                    newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(24 + i), Integer.valueOf(i)}));
                    newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(25 + i), Integer.valueOf(i)}));
                    if (newBufferedWriter != null) {
                        if (0 != 0) {
                            try {
                                newBufferedWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newBufferedWriter.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
        for (int i2 = 0; i2 < 5; i2++) {
            newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "filtered_" + i2).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            Throwable th3 = null;
            try {
                try {
                    newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(25 + i2), Integer.valueOf(i2)}));
                    if (newBufferedWriter != null) {
                        if (0 != 0) {
                            try {
                                newBufferedWriter.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            newBufferedWriter.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
        getObjectMapper().registerSubtypes(new Class[]{SettableSplittableLocalInputSource.class});
    }

    @After
    public void teardown() {
        this.temporaryFolder.delete();
    }

    @Test
    public void testIsReady() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(INTERVAL_TO_INDEX, false, true);
        IngestionTestBase.TestLocalTaskActionClient createActionClient = createActionClient(newTask);
        TaskToolbox createTaskToolbox = createTaskToolbox(newTask, createActionClient);
        prepareTaskForLocking(newTask);
        Assert.assertTrue(newTask.isReady(createActionClient));
        Iterator subTaskSpecIterator = newTask.createSinglePhaseTaskRunner(createTaskToolbox).subTaskSpecIterator();
        while (subTaskSpecIterator.hasNext()) {
            SinglePhaseSubTaskSpec singlePhaseSubTaskSpec = (SinglePhaseSubTaskSpec) subTaskSpecIterator.next();
            SinglePhaseSubTask singlePhaseSubTask = new SinglePhaseSubTask((String) null, singlePhaseSubTaskSpec.getGroupId(), (TaskResource) null, singlePhaseSubTaskSpec.getSupervisorTaskId(), 0, singlePhaseSubTaskSpec.getIngestionSpec(), singlePhaseSubTaskSpec.getContext(), getIndexingServiceClient(), (IndexTaskClientFactory) null, new TestAppenderatorsManager());
            IngestionTestBase.TestLocalTaskActionClient createActionClient2 = createActionClient(singlePhaseSubTask);
            prepareTaskForLocking(singlePhaseSubTask);
            Assert.assertTrue(singlePhaseSubTask.isReady(createActionClient2));
        }
    }

    private void runTestTask(@Nullable Interval interval, Granularity granularity, boolean z, Collection<DataSegment> collection) {
        ParallelIndexSupervisorTask newTask = newTask(interval, granularity, z, true);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
        assertShardSpec(newTask, interval == null ? LockGranularity.TIME_CHUNK : this.lockGranularity, z, collection);
    }

    private void runOverwriteTask(@Nullable Interval interval, Granularity granularity, LockGranularity lockGranularity) {
        ParallelIndexSupervisorTask newTask = newTask(interval, granularity, false, true);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
        assertShardSpecAfterOverwrite(newTask, lockGranularity);
    }

    private void testRunAndOverwrite(@Nullable Interval interval, Granularity granularity) {
        LockGranularity lockGranularity;
        runTestTask(interval, Granularities.DAY, false, Collections.emptyList());
        Interval interval2 = interval == null ? Intervals.ETERNITY : interval;
        HashSet hashSet = new HashSet(getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval2, Segments.ONLY_VISIBLE));
        if (interval == null) {
            lockGranularity = LockGranularity.TIME_CHUNK;
        } else {
            lockGranularity = granularity.equals(Granularities.DAY) ? this.lockGranularity : LockGranularity.TIME_CHUNK;
        }
        runOverwriteTask(interval, granularity, lockGranularity);
        Collection retrieveUsedSegmentsForInterval = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", interval2, Segments.ONLY_VISIBLE);
        hashSet.addAll(retrieveUsedSegmentsForInterval);
        Assert.assertEquals(new HashSet(retrieveUsedSegmentsForInterval), VersionedIntervalTimeline.forSegments(hashSet).findNonOvershadowedObjectsInInterval(interval2, Partitions.ONLY_COMPLETE));
    }

    private void assertShardSpec(ParallelIndexSupervisorTask parallelIndexSupervisorTask, LockGranularity lockGranularity, boolean z, Collection<DataSegment> collection) {
        Set<DataSegment> publishedSegments = getIndexingServiceClient().getPublishedSegments(parallelIndexSupervisorTask);
        if (z || lockGranularity != LockGranularity.TIME_CHUNK) {
            Map groupSegmentsByInterval = SegmentUtils.groupSegmentsByInterval(collection);
            for (DataSegment dataSegment : publishedSegments) {
                Assert.assertSame(NumberedShardSpec.class, dataSegment.getShardSpec().getClass());
                NumberedShardSpec shardSpec = dataSegment.getShardSpec();
                List list = (List) groupSegmentsByInterval.get(dataSegment.getInterval());
                Assert.assertEquals((list == null || list.isEmpty()) ? 0 : ((DataSegment) list.get(0)).getShardSpec().getNumCorePartitions(), shardSpec.getNumCorePartitions());
            }
            return;
        }
        Iterator it = SegmentUtils.groupSegmentsByInterval(publishedSegments).values().iterator();
        while (it.hasNext()) {
            Iterator it2 = ((List) it.next()).iterator();
            while (it2.hasNext()) {
                Assert.assertSame(NumberedShardSpec.class, ((DataSegment) it2.next()).getShardSpec().getClass());
                Assert.assertEquals(r0.size(), r0.getShardSpec().getNumCorePartitions());
            }
        }
    }

    private void assertShardSpecAfterOverwrite(ParallelIndexSupervisorTask parallelIndexSupervisorTask, LockGranularity lockGranularity) {
        Map groupSegmentsByInterval = SegmentUtils.groupSegmentsByInterval(getIndexingServiceClient().getPublishedSegments(parallelIndexSupervisorTask));
        if (lockGranularity != LockGranularity.SEGMENT) {
            Iterator it = groupSegmentsByInterval.values().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((List) it.next()).iterator();
                while (it2.hasNext()) {
                    Assert.assertSame(NumberedShardSpec.class, ((DataSegment) it2.next()).getShardSpec().getClass());
                    Assert.assertEquals(r0.size(), r0.getShardSpec().getNumCorePartitions());
                }
            }
            return;
        }
        Iterator it3 = groupSegmentsByInterval.values().iterator();
        while (it3.hasNext()) {
            Iterator it4 = ((List) it3.next()).iterator();
            while (it4.hasNext()) {
                Assert.assertSame(NumberedOverwriteShardSpec.class, ((DataSegment) it4.next()).getShardSpec().getClass());
                Assert.assertEquals(r0.size(), r0.getShardSpec().getAtomicUpdateGroupSize());
            }
        }
    }

    @Test
    public void testWithoutInterval() {
        testRunAndOverwrite(null, Granularities.DAY);
    }

    @Test
    public void testRunInParallel() {
        testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.DAY);
    }

    @Test
    public void testWithoutIntervalWithDifferentSegmentGranularity() {
        testRunAndOverwrite(null, Granularities.MONTH);
    }

    @Test
    public void testRunInParallelWithDifferentSegmentGranularity() {
        testRunAndOverwrite(Intervals.of("2017-12/P1M"), Granularities.MONTH);
    }

    @Test
    public void testRunInSequential() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017-12/P1M"), false, false);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
        assertShardSpec(newTask, this.lockGranularity, false, Collections.emptyList());
    }

    @Test
    public void testPublishEmptySegments() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2020-12/P1M"), false, true);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
    }

    @Test
    public void testWith1MaxNumConcurrentSubTasks() {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017-12/P1M"), Granularities.DAY, false, true, new ParallelIndexTuningConfig((Integer) null, (Integer) null, (Integer) null, (Long) null, (Long) null, (Integer) null, (SplitHintSpec) null, (PartitionsSpec) null, (IndexSpec) null, (IndexSpec) null, (Integer) null, (Boolean) null, (Boolean) null, (Long) null, (SegmentWriteOutMediumFactory) null, (Integer) null, 1, (Integer) null, (Long) null, (Duration) null, (Integer) null, (Integer) null, (Integer) null, (Boolean) null, (Integer) null, (Integer) null));
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertEquals(TaskState.SUCCESS, getIndexingServiceClient().runAndWait(newTask).getStatusCode());
        Assert.assertNull("Runner must be null if the task was in the sequential mode", newTask.getCurrentRunner());
        assertShardSpec(newTask, this.lockGranularity, false, Collections.emptyList());
    }

    @Test
    public void testAppendToExisting() {
        Interval of = Intervals.of("2017-12/P1M");
        runTestTask(of, Granularities.DAY, true, Collections.emptyList());
        Collection<?> retrieveUsedSegmentsForInterval = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", of, Segments.ONLY_VISIBLE);
        runTestTask(of, Granularities.DAY, true, retrieveUsedSegmentsForInterval);
        Collection retrieveUsedSegmentsForInterval2 = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", of, Segments.ONLY_VISIBLE);
        Assert.assertTrue(retrieveUsedSegmentsForInterval2.containsAll(retrieveUsedSegmentsForInterval));
        Assert.assertEquals(new HashSet(retrieveUsedSegmentsForInterval2), VersionedIntervalTimeline.forSegments(retrieveUsedSegmentsForInterval2).findNonOvershadowedObjectsInInterval(of, Partitions.ONLY_COMPLETE));
    }

    @Test
    public void testOverwriteAndAppend() {
        Interval of = Intervals.of("2017-12/P1M");
        testRunAndOverwrite(of, Granularities.DAY);
        Collection<?> retrieveUsedSegmentsForInterval = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", of, Segments.ONLY_VISIBLE);
        runTestTask(of, Granularities.DAY, true, retrieveUsedSegmentsForInterval);
        Collection retrieveUsedSegmentsForInterval2 = getStorageCoordinator().retrieveUsedSegmentsForInterval("dataSource", of, Segments.ONLY_VISIBLE);
        Assert.assertTrue(retrieveUsedSegmentsForInterval2.containsAll(retrieveUsedSegmentsForInterval));
        Assert.assertEquals(new HashSet(retrieveUsedSegmentsForInterval2), VersionedIntervalTimeline.forSegments(retrieveUsedSegmentsForInterval2).findNonOvershadowedObjectsInInterval(of, Partitions.ONLY_COMPLETE));
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, boolean z, boolean z2) {
        return newTask(interval, Granularities.DAY, z, z2);
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, Granularity granularity, boolean z, boolean z2) {
        return newTask(interval, granularity, z, z2, AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING);
    }

    private ParallelIndexSupervisorTask newTask(@Nullable Interval interval, Granularity granularity, boolean z, boolean z2, ParallelIndexTuningConfig parallelIndexTuningConfig) {
        ParallelIndexIngestionSpec parallelIndexIngestionSpec;
        if (this.useInputFormatApi) {
            parallelIndexIngestionSpec = new ParallelIndexIngestionSpec(new DataSchema("dataSource", DEFAULT_TIMESTAMP_SPEC, DEFAULT_DIMENSIONS_SPEC, new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, new UniformGranularitySpec(granularity, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval)), (TransformSpec) null), new ParallelIndexIOConfig((FirehoseFactory) null, new SettableSplittableLocalInputSource(this.inputDir, "test_*", z2), DEFAULT_INPUT_FORMAT, Boolean.valueOf(z)), parallelIndexTuningConfig);
        } else {
            parallelIndexIngestionSpec = new ParallelIndexIngestionSpec(new DataSchema("dataSource", (Map) getObjectMapper().convertValue(new StringInputRowParser(DEFAULT_PARSE_SPEC, (String) null), Map.class), new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, new UniformGranularitySpec(granularity, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval)), (TransformSpec) null, getObjectMapper()), new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), Boolean.valueOf(z)), parallelIndexTuningConfig);
        }
        return new ParallelIndexSupervisorTask((String) null, (String) null, (TaskResource) null, parallelIndexIngestionSpec, Collections.emptyMap(), getIndexingServiceClient(), (ChatHandlerProvider) null, (AuthorizerMapper) null, (RowIngestionMetersFactory) null, (AppenderatorsManager) null);
    }
}
