package org.apache.druid.server.coordinator.duty;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest.class */
public class KillStalePendingSegmentsTest {
    private TestOverlordClient overlordClient;
    private KillStalePendingSegments killDuty;

    /* loaded from: input_file:org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest$DS.class */
    private static class DS {
        static final String WIKI = "wiki";
        static final String KOALA = "koala";

        private DS() {
        }
    }

    /* loaded from: input_file:org/apache/druid/server/coordinator/duty/KillStalePendingSegmentsTest$TestOverlordClient.class */
    private static class TestOverlordClient extends NoopOverlordClient {
        private final List<TaskStatusPlus> taskStatuses;
        private final Map<String, List<DateTime>> datasourceToPendingSegments;
        private final Map<String, Interval> observedKillIntervals;
        private int taskIdSuffix;

        private TestOverlordClient() {
            this.taskStatuses = new ArrayList();
            this.datasourceToPendingSegments = new HashMap();
            this.observedKillIntervals = new HashMap();
            this.taskIdSuffix = 0;
        }

        void addTaskAndSegment(String str, DateTime dateTime, TaskState taskState) {
            List<TaskStatusPlus> list = this.taskStatuses;
            StringBuilder append = new StringBuilder().append(str).append("__");
            int i = this.taskIdSuffix;
            this.taskIdSuffix = i + 1;
            list.add(new TaskStatusPlus(append.append(i).toString(), (String) null, (String) null, dateTime, dateTime, taskState, taskState.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.RUNNING, 100L, TaskLocation.unknown(), str, (String) null));
            this.datasourceToPendingSegments.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(dateTime.plusMinutes(5));
        }

        @Override // org.apache.druid.client.indexing.NoopOverlordClient
        public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(@Nullable String str, @Nullable String str2, @Nullable Integer num) {
            return Futures.immediateFuture(CloseableIterators.wrap(this.taskStatuses.iterator(), (Closeable) null));
        }

        @Override // org.apache.druid.client.indexing.NoopOverlordClient
        public ListenableFuture<Integer> killPendingSegments(String str, Interval interval) {
            this.observedKillIntervals.put(str, interval);
            List<DateTime> remove = this.datasourceToPendingSegments.remove(str);
            if (remove == null || remove.isEmpty()) {
                return Futures.immediateFuture(0);
            }
            ArrayList arrayList = new ArrayList();
            int i = 0;
            for (DateTime dateTime : remove) {
                if (dateTime.isBefore(interval.getEnd())) {
                    i++;
                } else {
                    arrayList.add(dateTime);
                }
            }
            if (arrayList.size() > 0) {
                this.datasourceToPendingSegments.put(str, arrayList);
            }
            return Futures.immediateFuture(Integer.valueOf(i));
        }
    }

    @Before
    public void setup() {
        this.overlordClient = new TestOverlordClient();
        this.killDuty = new KillStalePendingSegments(this.overlordClient);
    }

    @Test
    public void testRetentionStarts1DayBeforeNowWhenNoKnownTask() {
        this.killDuty.run(createParamsWithDatasources("wiki").build());
        Interval interval = (Interval) this.overlordClient.observedKillIntervals.get("wiki");
        Assert.assertEquals(DateTimes.MIN, interval.getStart());
        Assert.assertTrue(DateTimes.nowUtc().minusDays(1).getMillis() - interval.getEnd().getMillis() <= 100);
    }

    @Test
    public void testRetentionStarts1DayBeforeEarliestActiveTask() {
        DateTime of = DateTimes.of("2023-01-01");
        this.overlordClient.addTaskAndSegment("wiki", of, TaskState.RUNNING);
        this.overlordClient.addTaskAndSegment("wiki", of.plusHours(2), TaskState.RUNNING);
        this.overlordClient.addTaskAndSegment("wiki", of.plusDays(1), TaskState.RUNNING);
        this.overlordClient.addTaskAndSegment("wiki", of.plusHours(3), TaskState.RUNNING);
        this.killDuty.run(createParamsWithDatasources("wiki").build());
        Interval interval = (Interval) this.overlordClient.observedKillIntervals.get("wiki");
        Assert.assertEquals(DateTimes.MIN, interval.getStart());
        Assert.assertEquals(of.minusDays(1), interval.getEnd());
    }

    @Test
    public void testRetentionStarts1DayBeforeLatestCompletedTask() {
        DateTime of = DateTimes.of("2023-01-01");
        this.overlordClient.addTaskAndSegment("wiki", of, TaskState.FAILED);
        this.overlordClient.addTaskAndSegment("wiki", of.minusHours(2), TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("wiki", of.minusDays(2), TaskState.FAILED);
        this.overlordClient.addTaskAndSegment("wiki", of.minusDays(3), TaskState.SUCCESS);
        DruidCoordinatorRuntimeParams build = createParamsWithDatasources("wiki").build();
        this.killDuty.run(build);
        Interval interval = (Interval) this.overlordClient.observedKillIntervals.get("wiki");
        Assert.assertEquals(DateTimes.MIN, interval.getStart());
        Assert.assertEquals(of.minusDays(1), interval.getEnd());
        Assert.assertEquals(2L, build.getCoordinatorStats().get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, "wiki")));
    }

    @Test
    public void testRetentionStarts1DayBeforeLatestCompletedOrEarliestActiveTask() {
        DateTime of = DateTimes.of("2023-02-01");
        this.overlordClient.addTaskAndSegment("wiki", of, TaskState.FAILED);
        DateTime of2 = DateTimes.of("2023-01-01");
        this.overlordClient.addTaskAndSegment("koala", of2, TaskState.RUNNING);
        this.killDuty.run(createParamsWithDatasources("wiki", "koala").build());
        DateTime earlierOf = DateTimes.earlierOf(of2, of);
        Interval interval = (Interval) this.overlordClient.observedKillIntervals.get("wiki");
        Assert.assertEquals(DateTimes.MIN, interval.getStart());
        Assert.assertEquals(earlierOf.minusDays(1), interval.getEnd());
        Assert.assertEquals(DateTimes.MIN, ((Interval) this.overlordClient.observedKillIntervals.get("koala")).getStart());
        Assert.assertEquals(earlierOf.minusDays(1), interval.getEnd());
    }

    @Test
    public void testPendingSegmentOfDisallowedDatasourceIsNotDeleted() {
        DruidCoordinatorRuntimeParams build = createParamsWithDatasources("wiki", "koala").withDynamicConfigs(CoordinatorDynamicConfig.builder().withDatasourcesToNotKillPendingSegmentsIn(Collections.singleton("koala")).build()).build();
        DateTime of = DateTimes.of("2023-01-01");
        this.overlordClient.addTaskAndSegment("wiki", of, TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("wiki", of.minusDays(3), TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("wiki", of.minusDays(5), TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("koala", of, TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("koala", of.minusDays(3), TaskState.SUCCESS);
        this.overlordClient.addTaskAndSegment("koala", of.minusDays(5), TaskState.SUCCESS);
        this.killDuty.run(build);
        CoordinatorRunStats coordinatorStats = build.getCoordinatorStats();
        Assert.assertTrue(this.overlordClient.observedKillIntervals.containsKey("wiki"));
        Assert.assertEquals(2L, coordinatorStats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, "wiki")));
        Assert.assertFalse(this.overlordClient.observedKillIntervals.containsKey("koala"));
        Assert.assertEquals(0L, coordinatorStats.get(Stats.Kill.PENDING_SEGMENTS, RowKey.of(Dimension.DATASOURCE, "koala")));
    }

    private DruidCoordinatorRuntimeParams.Builder createParamsWithDatasources(String... strArr) {
        DruidCoordinatorRuntimeParams.Builder newBuilder = DruidCoordinatorRuntimeParams.newBuilder(DateTimes.nowUtc());
        HashSet hashSet = new HashSet();
        for (String str : strArr) {
            hashSet.add(DataSegment.builder().dataSource(str).interval(Intervals.ETERNITY).version("v1").shardSpec(new NumberedShardSpec(0, 1)).size(100L).build());
        }
        return newBuilder.withUsedSegments(hashSet);
    }
}
