package org.apache.druid.indexing.common.task.batch.parallel;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.batch.parallel.AbstractParallelIndexSupervisorTaskTest;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.class */
public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSupervisorTaskTest {
    private File inputDir;

    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest$TestParallelIndexSubTaskSpec.class */
    private static class TestParallelIndexSubTaskSpec extends ParallelIndexSubTaskSpec {
        private final ParallelIndexSupervisorTask supervisorTask;

        TestParallelIndexSubTaskSpec(String str, String str2, ParallelIndexSupervisorTask parallelIndexSupervisorTask, ParallelIndexIngestionSpec parallelIndexIngestionSpec, Map<String, Object> map, InputSplit inputSplit) {
            super(str, str2, parallelIndexSupervisorTask.getId(), parallelIndexIngestionSpec, map, inputSplit);
            this.supervisorTask = parallelIndexSupervisorTask;
        }

        /* renamed from: newSubTask, reason: merged with bridge method [inline-methods] */
        public ParallelIndexSubTask m16newSubTask(int i) {
            return new ParallelIndexSubTask((String) null, getGroupId(), (TaskResource) null, getSupervisorTaskId(), i, getIngestionSpec(), getContext(), (IndexingServiceClient) null, new AbstractParallelIndexSupervisorTaskTest.LocalParallelIndexTaskClientFactory(this.supervisorTask));
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest$TestRunner.class */
    private static class TestRunner extends AbstractParallelIndexSupervisorTaskTest.TestParallelIndexTaskRunner {
        private final ParallelIndexSupervisorTask supervisorTask;

        TestRunner(TaskToolbox taskToolbox, ParallelIndexSupervisorTask parallelIndexSupervisorTask, @Nullable IndexingServiceClient indexingServiceClient) {
            super(taskToolbox, parallelIndexSupervisorTask.getId(), parallelIndexSupervisorTask.getGroupId(), parallelIndexSupervisorTask.getIngestionSchema(), parallelIndexSupervisorTask.getContext(), indexingServiceClient);
            this.supervisorTask = parallelIndexSupervisorTask;
        }

        ParallelIndexSubTaskSpec newTaskSpec(InputSplit inputSplit) {
            return new TestParallelIndexSubTaskSpec(this.supervisorTask.getId() + "_" + getAndIncrementNextSpecId(), this.supervisorTask.getGroupId(), this.supervisorTask, new ParallelIndexIngestionSpec(getIngestionSchema().getDataSchema(), new ParallelIndexIOConfig(getIngestionSchema().getIOConfig().getFirehoseFactory().withSplit(inputSplit), Boolean.valueOf(getIngestionSchema().getIOConfig().isAppendToExisting())), getIngestionSchema().getTuningConfig()), this.supervisorTask.getContext(), inputSplit);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest$TestSupervisorTask.class */
    public static class TestSupervisorTask extends AbstractParallelIndexSupervisorTaskTest.TestParallelIndexSupervisorTask {
        private final IndexingServiceClient indexingServiceClient;

        TestSupervisorTask(String str, TaskResource taskResource, ParallelIndexIngestionSpec parallelIndexIngestionSpec, Map<String, Object> map, IndexingServiceClient indexingServiceClient) {
            super(str, taskResource, parallelIndexIngestionSpec, map, indexingServiceClient);
            this.indexingServiceClient = indexingServiceClient;
        }

        ParallelIndexTaskRunner createRunner(TaskToolbox taskToolbox) {
            setRunner(new TestRunner(taskToolbox, this, this.indexingServiceClient));
            return getRunner();
        }
    }

    @Before
    public void setup() throws IOException {
        BufferedWriter newBufferedWriter;
        this.inputDir = this.temporaryFolder.newFolder("data");
        for (int i = 0; i < 5; i++) {
            newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "test_" + i).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            Throwable th = null;
            try {
                try {
                    newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(24 + i), Integer.valueOf(i)}));
                    newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(25 + i), Integer.valueOf(i)}));
                    if (newBufferedWriter != null) {
                        if (0 != 0) {
                            try {
                                newBufferedWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newBufferedWriter.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
        for (int i2 = 0; i2 < 5; i2++) {
            newBufferedWriter = Files.newBufferedWriter(new File(this.inputDir, "filtered_" + i2).toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
            Throwable th3 = null;
            try {
                try {
                    newBufferedWriter.write(StringUtils.format("2017-12-%d,%d th test file\n", new Object[]{Integer.valueOf(25 + i2), Integer.valueOf(i2)}));
                    if (newBufferedWriter != null) {
                        if (0 != 0) {
                            try {
                                newBufferedWriter.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            newBufferedWriter.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        }
        this.indexingServiceClient = new AbstractParallelIndexSupervisorTaskTest.LocalIndexingServiceClient();
        this.localDeepStorage = this.temporaryFolder.newFolder("localStorage");
    }

    @After
    public void teardown() {
        this.indexingServiceClient.shutdown();
        this.temporaryFolder.delete();
    }

    @Test
    public void testIsReady() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017/2018"), new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), false));
        this.actionClient = createActionClient(newTask);
        this.toolbox = createTaskToolbox(newTask);
        prepareTaskForLocking(newTask);
        Assert.assertTrue(newTask.isReady(this.actionClient));
        for (ParallelIndexSubTaskSpec parallelIndexSubTaskSpec : newTask.createRunner(this.toolbox).subTaskSpecIterator()) {
            ParallelIndexSubTask parallelIndexSubTask = new ParallelIndexSubTask((String) null, parallelIndexSubTaskSpec.getGroupId(), (TaskResource) null, parallelIndexSubTaskSpec.getSupervisorTaskId(), 0, parallelIndexSubTaskSpec.getIngestionSpec(), parallelIndexSubTaskSpec.getContext(), this.indexingServiceClient, (IndexTaskClientFactory) null);
            LocalTaskActionClient createActionClient = createActionClient(parallelIndexSubTask);
            prepareTaskForLocking(parallelIndexSubTask);
            Assert.assertTrue(parallelIndexSubTask.isReady(createActionClient));
        }
    }

    private void runTestWithoutIntervalTask() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(null, new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), false));
        this.actionClient = createActionClient(newTask);
        this.toolbox = createTaskToolbox(newTask);
        prepareTaskForLocking(newTask);
        Assert.assertTrue(newTask.isReady(this.actionClient));
        Assert.assertEquals(TaskState.SUCCESS, newTask.run(this.toolbox).getStatusCode());
        shutdownTask(newTask);
    }

    @Test
    public void testWithoutInterval() throws Exception {
        runTestWithoutIntervalTask();
        Interval of = Intervals.of("2017-12-24/P1D");
        List usedSegmentsForInterval = getStorageCoordinator().getUsedSegmentsForInterval("dataSource", of);
        Assert.assertEquals(1L, usedSegmentsForInterval.size());
        runTestWithoutIntervalTask();
        List usedSegmentsForInterval2 = getStorageCoordinator().getUsedSegmentsForInterval("dataSource", of);
        Assert.assertEquals(1L, usedSegmentsForInterval2.size());
        Assert.assertTrue(((DataSegment) usedSegmentsForInterval.get(0)).getVersion().compareTo(((DataSegment) usedSegmentsForInterval2.get(0)).getVersion()) < 0);
    }

    @Test
    public void testRunInParallel() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017/2018"), new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), false));
        this.actionClient = createActionClient(newTask);
        this.toolbox = createTaskToolbox(newTask);
        prepareTaskForLocking(newTask);
        Assert.assertTrue(newTask.isReady(this.actionClient));
        Assert.assertEquals(TaskState.SUCCESS, newTask.run(this.toolbox).getStatusCode());
    }

    @Test
    public void testRunInSequential() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017/2018"), new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", null) { // from class: org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskTest.1
            public boolean isSplittable() {
                return false;
            }
        }, false));
        this.actionClient = createActionClient(newTask);
        this.toolbox = createTaskToolbox(newTask);
        prepareTaskForLocking(newTask);
        Assert.assertTrue(newTask.isReady(this.actionClient));
        Assert.assertEquals(TaskState.SUCCESS, newTask.run(this.toolbox).getStatusCode());
    }

    @Test
    public void testPublishEmptySegments() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2020/2021"), new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), false));
        this.actionClient = createActionClient(newTask);
        this.toolbox = createTaskToolbox(newTask);
        prepareTaskForLocking(newTask);
        Assert.assertTrue(newTask.isReady(this.actionClient));
        Assert.assertEquals(TaskState.SUCCESS, newTask.run(this.toolbox).getStatusCode());
    }

    @Test
    public void testWith1MaxNumSubTasks() throws Exception {
        ParallelIndexSupervisorTask newTask = newTask(Intervals.of("2017/2018"), new ParallelIndexIOConfig(new LocalFirehoseFactory(this.inputDir, "test_*", (StringInputRowParser) null), false), new ParallelIndexTuningConfig((Integer) null, (Integer) null, (Integer) null, (Long) null, (Long) null, (Integer) null, (IndexSpec) null, (Integer) null, (Boolean) null, (Boolean) null, (Long) null, (SegmentWriteOutMediumFactory) null, 1, (Integer) null, (Integer) null, (Duration) null, (Integer) null, (Boolean) null, (Integer) null, (Integer) null));
        this.actionClient = createActionClient(newTask);
        this.toolbox = createTaskToolbox(newTask);
        prepareTaskForLocking(newTask);
        Assert.assertTrue(newTask.isReady(this.actionClient));
        Assert.assertEquals(TaskState.SUCCESS, newTask.run(this.toolbox).getStatusCode());
        Assert.assertNull("Runner must be null if the task was in the sequential mode", newTask.getRunner());
    }

    private ParallelIndexSupervisorTask newTask(Interval interval, ParallelIndexIOConfig parallelIndexIOConfig) {
        return newTask(interval, parallelIndexIOConfig, new ParallelIndexTuningConfig((Integer) null, (Integer) null, (Integer) null, (Long) null, (Long) null, (Integer) null, (IndexSpec) null, (Integer) null, (Boolean) null, (Boolean) null, (Long) null, (SegmentWriteOutMediumFactory) null, 2, (Integer) null, (Integer) null, (Duration) null, (Integer) null, (Boolean) null, (Integer) null, (Integer) null));
    }

    private ParallelIndexSupervisorTask newTask(Interval interval, ParallelIndexIOConfig parallelIndexIOConfig, ParallelIndexTuningConfig parallelIndexTuningConfig) {
        return new TestSupervisorTask(null, null, new ParallelIndexIngestionSpec(new DataSchema("dataSource", (Map) getObjectMapper().convertValue(new StringInputRowParser(DEFAULT_PARSE_SPEC, (String) null), Map.class), new AggregatorFactory[]{new LongSumAggregatorFactory("val", "val")}, new UniformGranularitySpec(Granularities.DAY, Granularities.MINUTE, interval == null ? null : Collections.singletonList(interval)), (TransformSpec) null, getObjectMapper()), parallelIndexIOConfig, parallelIndexTuningConfig), new HashMap(), this.indexingServiceClient);
    }
}
