package org.apache.druid.indexing.common.task.batch.parallel;

import com.google.common.collect.ImmutableList;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest.class */
public class MultiPhaseParallelIndexingTest extends AbstractParallelIndexSupervisorTaskTest {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private final LockGranularity lockGranularity;
    private File inputDir;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest$TestPartialSegmentGenerateRunner.class */
    public static class TestPartialSegmentGenerateRunner extends PartialSegmentGenerateParallelIndexTaskRunner {
        private final ParallelIndexSupervisorTask supervisorTask;

        private TestPartialSegmentGenerateRunner(TaskToolbox taskToolbox, ParallelIndexSupervisorTask parallelIndexSupervisorTask, IndexingServiceClient indexingServiceClient) {
            super(taskToolbox, parallelIndexSupervisorTask.getId(), parallelIndexSupervisorTask.getGroupId(), parallelIndexSupervisorTask.getIngestionSchema(), parallelIndexSupervisorTask.getContext(), indexingServiceClient);
            this.supervisorTask = parallelIndexSupervisorTask;
        }

        Iterator<SubTaskSpec<PartialSegmentGenerateTask>> subTaskSpecIterator() throws IOException {
            final Iterator subTaskSpecIterator = super.subTaskSpecIterator();
            return new Iterator<SubTaskSpec<PartialSegmentGenerateTask>>() { // from class: org.apache.druid.indexing.common.task.batch.parallel.MultiPhaseParallelIndexingTest.TestPartialSegmentGenerateRunner.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return subTaskSpecIterator.hasNext();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public SubTaskSpec<PartialSegmentGenerateTask> next() {
                    try {
                        Thread.sleep(10L);
                        return (SubTaskSpec) subTaskSpecIterator.next();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }

        SubTaskSpec<PartialSegmentGenerateTask> newTaskSpec(InputSplit inputSplit) {
            final ParallelIndexIngestionSpec parallelIndexIngestionSpec = new ParallelIndexIngestionSpec(getIngestionSchema().getDataSchema(), new ParallelIndexIOConfig(getBaseFirehoseFactory().withSplit(inputSplit), Boolean.valueOf(getIngestionSchema().getIOConfig().isAppendToExisting())), getIngestionSchema().getTuningConfig());
            return new SubTaskSpec<PartialSegmentGenerateTask>(getTaskId() + "_" + getAndIncrementNextSpecId(), getGroupId(), getTaskId(), getContext(), inputSplit) { // from class: org.apache.druid.indexing.common.task.batch.parallel.MultiPhaseParallelIndexingTest.TestPartialSegmentGenerateRunner.2
                /* renamed from: newSubTask, reason: merged with bridge method [inline-methods] */
                public PartialSegmentGenerateTask m14newSubTask(int i) {
                    return new PartialSegmentGenerateTask((String) null, getGroupId(), (TaskResource) null, getSupervisorTaskId(), i, parallelIndexIngestionSpec, getContext(), TestPartialSegmentGenerateRunner.this.getIndexingServiceClient(), new AbstractParallelIndexSupervisorTaskTest.LocalParallelIndexTaskClientFactory(TestPartialSegmentGenerateRunner.this.supervisorTask), new TestAppenderatorsManager());
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest$TestPartialSegmentMergeParallelIndexTaskRunner.class */
    public static class TestPartialSegmentMergeParallelIndexTaskRunner extends PartialSegmentMergeParallelIndexTaskRunner {
        private final ParallelIndexSupervisorTask supervisorTask;

        private TestPartialSegmentMergeParallelIndexTaskRunner(TaskToolbox taskToolbox, ParallelIndexSupervisorTask parallelIndexSupervisorTask, List<PartialSegmentMergeIOConfig> list, IndexingServiceClient indexingServiceClient) {
            super(taskToolbox, parallelIndexSupervisorTask.getId(), parallelIndexSupervisorTask.getGroupId(), parallelIndexSupervisorTask.getIngestionSchema().getDataSchema(), list, parallelIndexSupervisorTask.getIngestionSchema().getTuningConfig(), parallelIndexSupervisorTask.getContext(), indexingServiceClient);
            this.supervisorTask = parallelIndexSupervisorTask;
        }

        Iterator<SubTaskSpec<PartialSegmentMergeTask>> subTaskSpecIterator() {
            final Iterator subTaskSpecIterator = super.subTaskSpecIterator();
            return new Iterator<SubTaskSpec<PartialSegmentMergeTask>>() { // from class: org.apache.druid.indexing.common.task.batch.parallel.MultiPhaseParallelIndexingTest.TestPartialSegmentMergeParallelIndexTaskRunner.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return subTaskSpecIterator.hasNext();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public SubTaskSpec<PartialSegmentMergeTask> next() {
                    try {
                        Thread.sleep(10L);
                        return (SubTaskSpec) subTaskSpecIterator.next();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }

        SubTaskSpec<PartialSegmentMergeTask> newTaskSpec(PartialSegmentMergeIOConfig partialSegmentMergeIOConfig) {
            final PartialSegmentMergeIngestionSpec partialSegmentMergeIngestionSpec = new PartialSegmentMergeIngestionSpec(this.supervisorTask.getIngestionSchema().getDataSchema(), partialSegmentMergeIOConfig, getTuningConfig());
            return new SubTaskSpec<PartialSegmentMergeTask>(getTaskId() + "_" + getAndIncrementNextSpecId(), getGroupId(), getTaskId(), getContext(), new InputSplit(partialSegmentMergeIOConfig.getPartitionLocations())) { // from class: org.apache.druid.indexing.common.task.batch.parallel.MultiPhaseParallelIndexingTest.TestPartialSegmentMergeParallelIndexTaskRunner.2
                /* renamed from: newSubTask, reason: merged with bridge method [inline-methods] */
                public PartialSegmentMergeTask m15newSubTask(int i) {
                    return new TestPartialSegmentMergeTask(null, getGroupId(), null, getSupervisorTaskId(), i, partialSegmentMergeIngestionSpec, getContext(), TestPartialSegmentMergeParallelIndexTaskRunner.this.getIndexingServiceClient(), new AbstractParallelIndexSupervisorTaskTest.LocalParallelIndexTaskClientFactory(TestPartialSegmentMergeParallelIndexTaskRunner.this.supervisorTask), TestPartialSegmentMergeParallelIndexTaskRunner.this.getToolbox());
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest$TestPartialSegmentMergeTask.class */
    public static class TestPartialSegmentMergeTask extends PartialSegmentMergeTask {
        private final TaskToolbox toolbox;

        private TestPartialSegmentMergeTask(@Nullable String str, String str2, TaskResource taskResource, String str3, int i, PartialSegmentMergeIngestionSpec partialSegmentMergeIngestionSpec, Map<String, Object> map, IndexingServiceClient indexingServiceClient, IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> indexTaskClientFactory, TaskToolbox taskToolbox) {
            super(str, str2, taskResource, str3, i, partialSegmentMergeIngestionSpec, map, indexingServiceClient, indexTaskClientFactory, (HttpClient) null);
            this.toolbox = taskToolbox;
        }

        File fetchSegmentFile(File file, PartitionLocation partitionLocation) {
            File findPartitionFile = this.toolbox.getIntermediaryDataManager().findPartitionFile(getSupervisorTaskId(), partitionLocation.getSubTaskId(), partitionLocation.getInterval(), partitionLocation.getPartitionId());
            if (findPartitionFile == null) {
                throw new ISE("Can't find segment file for location[%s] at path[%s]", new Object[]{partitionLocation});
            }
            return findPartitionFile;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingTest$TestSupervisorTask.class */
    public static class TestSupervisorTask extends AbstractParallelIndexSupervisorTaskTest.TestParallelIndexSupervisorTask {
        TestSupervisorTask(String str, TaskResource taskResource, ParallelIndexIngestionSpec parallelIndexIngestionSpec, Map<String, Object> map, IndexingServiceClient indexingServiceClient) {
            super(str, taskResource, parallelIndexIngestionSpec, map, indexingServiceClient);
        }

        public PartialSegmentGenerateParallelIndexTaskRunner createPartialSegmentGenerateRunner(TaskToolbox taskToolbox) {
            return new TestPartialSegmentGenerateRunner(taskToolbox, this, getIndexingServiceClient());
        }

        public PartialSegmentMergeParallelIndexTaskRunner createPartialSegmentMergeRunner(TaskToolbox taskToolbox, List<PartialSegmentMergeIOConfig> list) {
            return new TestPartialSegmentMergeParallelIndexTaskRunner(taskToolbox, this, list, getIndexingServiceClient());
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Iterable<Object[]> constructorFeeder() {
        return ImmutableList.of(new Object[]{LockGranularity.TIME_CHUNK}, new Object[]{LockGranularity.SEGMENT});
    }

    public MultiPhaseParallelIndexingTest(LockGranularity lockGranularity) {
        this.lockGranularity = lockGranularity;
    }

    @Before
    public void setup() throws IOException {
        BufferedWriter newBufferedWriter;
        this.inputDir = this.temporaryFolder.newFolder("data");
        for (int i = 0; i < 10; i++) {
            newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            Throwable th = null;
            for (int i2 = 0; i2 < 10; i2++) {
                try {
                    try {
                        newBufferedWriter.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", new Object[]{Integer.valueOf(i2 + 1), Integer.valueOf(i + 10), Integer.valueOf(i)}));
                        newBufferedWriter.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", new Object[]{Integer.valueOf(i2 + 2), Integer.valueOf(i + 11), Integer.valueOf(i)}));
                    } finally {
                    }
                } finally {
                }
            }
            if (newBufferedWriter != null) {
                if (0 != 0) {
                    try {
                        newBufferedWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    newBufferedWriter.close();
                }
            }
        }
        for (int i3 = 0; i3 < 5; i3++) {
            newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "filtered_" + i3).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            Throwable th3 = null;
            try {
                try {
                    newBufferedWriter.write(StringUtils.format("2017-12-%d,%d,%d th test file\n", new Object[]{Integer.valueOf(i3 + 1), Integer.valueOf(i3 + 10), Integer.valueOf(i3)}));
                    if (newBufferedWriter != null) {
                        if (0 != 0) {
                            try {
                                newBufferedWriter.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            newBufferedWriter.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
        this.indexingServiceClient = new AbstractParallelIndexSupervisorTaskTest.LocalIndexingServiceClient();
        this.localDeepStorage = this.temporaryFolder.newFolder("localStorage");
        initializeIntermeidaryDataManager();
    }

    @After
    public void teardown() {
        this.indexingServiceClient.shutdown();
        this.temporaryFolder.delete();
    }

    @Test
    public void testRun() throws Exception {
        assertHashedPartition(runTestTask(Intervals.of("2017/2018"), new HashedPartitionsSpec((Integer) null, 2, ImmutableList.of("dim1", "dim2"))));
    }

    @Test
    public void testMissingIntervals() {
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("forceGuaranteedRollup is set but intervals is missing in granularitySpec");
        newTask(null, new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), false), new HashedPartitionsSpec((Integer) null, 2, (List) null));
    }

    @Test
    public void testMissingNumShards() {
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("forceGuaranteedRollup is set but numShards is missing in partitionsSpec");
        newTask(Intervals.of("2017/2018"), Granularities.DAY, new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), false), new ParallelIndexTuningConfig((Integer) null, (Integer) null, (Integer) null, (Long) null, (Long) null, (Integer) null, new HashedPartitionsSpec((Integer) null, (Integer) null, (List) null), (IndexSpec) null, (IndexSpec) null, (Integer) null, true, (Boolean) null, (Long) null, (SegmentWriteOutMediumFactory) null, (Integer) null, 2, (Integer) null, (Integer) null, (Duration) null, (Integer) null, (Integer) null, (Integer) null, (Boolean) null, (Integer) null, (Integer) null));
    }

    private Set<DataSegment> runTestTask(Interval interval, HashedPartitionsSpec hashedPartitionsSpec) throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(interval, new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), false), hashedPartitionsSpec);
        this.actionClient = createActionClient(newTask);
        this.toolbox = createTaskToolbox(newTask);
        prepareTaskForLocking(newTask);
        newTask.addToContext("forceTimeChunkLock", Boolean.valueOf(this.lockGranularity == LockGranularity.TIME_CHUNK));
        Assert.assertTrue(newTask.isReady(this.actionClient));
        Assert.assertEquals(TaskState.SUCCESS, newTask.run(this.toolbox).getStatusCode());
        shutdownTask(newTask);
        return this.actionClient.getPublishedSegments();
    }

    private ParallelIndexSupervisorTask newTask(Interval interval, ParallelIndexIOConfig parallelIndexIOConfig, HashedPartitionsSpec hashedPartitionsSpec) {
        return newTask(interval, Granularities.DAY, parallelIndexIOConfig, new ParallelIndexTuningConfig((Integer) null, (Integer) null, (Integer) null, (Long) null, (Long) null, (Integer) null, hashedPartitionsSpec, (IndexSpec) null, (IndexSpec) null, (Integer) null, true, (Boolean) null, (Long) null, (SegmentWriteOutMediumFactory) null, (Integer) null, 2, (Integer) null, (Integer) null, (Duration) null, (Integer) null, (Integer) null, (Integer) null, (Boolean) null, (Integer) null, (Integer) null));
    }

    private ParallelIndexSupervisorTask newTask(Interval interval, Granularity granularity, ParallelIndexIOConfig parallelIndexIOConfig, ParallelIndexTuningConfig parallelIndexTuningConfig) {
        return new TestSupervisorTask(null, null, new ParallelIndexIngestionSpec(new DataSchema("dataSource", (Map) getObjectMapper().convertValue(new StringInputRowParser(new CSVParseSpec(new TimestampSpec("ts", "auto", (DateTime) null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")), new ArrayList(), new ArrayList()), (String) null, Arrays.asList("ts", "dim1", "dim2", "val"), false, 0), (String) null), Map.class), new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, new UniformGranularitySpec(granularity, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval)), (TransformSpec) null, getObjectMapper()), parallelIndexIOConfig, parallelIndexTuningConfig), new HashMap(), this.indexingServiceClient);
    }

    private void assertHashedPartition(Set<DataSegment> set) throws IOException, SegmentLoadingException {
        HashMap hashMap = new HashMap();
        set.forEach(dataSegment -> {
            ((List) hashMap.computeIfAbsent(dataSegment.getInterval(), interval -> {
                return new ArrayList();
            })).add(dataSegment);
        });
        File newFolder = this.temporaryFolder.newFolder();
        for (List<DataSegment> list : hashMap.values()) {
            Assert.assertEquals(2L, list.size());
            for (DataSegment dataSegment2 : list) {
                List list2 = new ScanQueryRunnerFactory(new ScanQueryQueryToolChest(new ScanQueryConfig().setLegacy(false), DefaultGenericQueryMetricsFactory.instance()), new ScanQueryEngine(), new ScanQueryConfig()).createRunner(new SegmentLoaderFactory(getIndexIO(), getObjectMapper()).manufacturate(newFolder).getSegment(dataSegment2)).run(QueryPlus.wrap(new ScanQuery(new TableDataSource("dataSource"), new SpecificSegmentSpec(new SegmentDescriptor(dataSegment2.getInterval(), dataSegment2.getVersion(), dataSegment2.getShardSpec().getPartitionNum())), (VirtualColumns) null, (ScanQuery.ResultFormat) null, 0, 0L, (ScanQuery.Order) null, (DimFilter) null, ImmutableList.of("dim1", "dim2"), false, (Map) null))).toList();
                int hash = HashBasedNumberedShardSpec.hash(getObjectMapper(), (List) ((ScanResultValue) list2.get(0)).getEvents());
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(hash, HashBasedNumberedShardSpec.hash(getObjectMapper(), (List) ((ScanResultValue) it.next()).getEvents()));
                }
            }
        }
    }
}
