package org.apache.druid.tests.coordinator.duty;

import com.google.inject.Inject;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.testing.clients.CompactionResourceTestClient;
import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.CompactionUtil;
import org.apache.druid.testing.utils.EventSerializer;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.KafkaUtil;
import org.apache.druid.testing.utils.StreamEventWriter;
import org.apache.druid.testing.utils.StreamGenerator;
import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Guice(moduleFactory = DruidTestModuleFactory.class)
@Test(groups = {TestNGGroup.COMPACTION})
/* loaded from: input_file:org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.class */
public class ITAutoCompactionLockContentionTest extends AbstractKafkaIndexingServiceTest {
    private static final Logger LOG = new Logger(ITAutoCompactionLockContentionTest.class);

    @Inject
    private CompactionResourceTestClient compactionResource;
    private AbstractStreamIndexingTest.GeneratedTestConfig generatedTestConfig;
    private StreamGenerator streamGenerator;
    private String fullDatasourceName;

    /* loaded from: input_file:org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest$Specs.class */
    private static class Specs {
        static final String SERIALIZER_PATH = "/stream/data/csv/serializer/serializer.json";
        static final String INPUT_FORMAT_PATH = "/stream/data/csv/input_format/input_format.json";
        static final String PARSER_TYPE = "inputFormat";
        static final int MAX_ROWS_PER_SEGMENT = 10000;

        private Specs() {
        }
    }

    @DataProvider
    public static Object[] getParameters() {
        return new Object[]{false, true};
    }

    @BeforeClass
    public void setupClass() throws Exception {
        doBeforeClass();
    }

    @BeforeMethod
    public void setup() throws Exception {
        this.generatedTestConfig = new AbstractStreamIndexingTest.GeneratedTestConfig(this, "inputFormat", getResourceAsString("/stream/data/csv/input_format/input_format.json"));
        this.fullDatasourceName = this.generatedTestConfig.getFullDatasourceName();
        this.streamGenerator = new WikipediaStreamEventStreamGenerator((EventSerializer) this.jsonMapper.readValue(getResourceAsStream("/stream/data/csv/serializer/serializer.json"), EventSerializer.class), 6, 100L);
    }

    @Override // org.apache.druid.tests.indexer.AbstractStreamIndexingTest
    public String getTestNamePrefix() {
        return "autocompact_lock_contention";
    }

    @Test(dataProvider = "getParameters")
    public void testAutoCompactionSkipsLockedIntervals(boolean z) throws Exception {
        if (shouldSkipTest(z)) {
            return;
        }
        Closeable createResourceCloser = createResourceCloser(this.generatedTestConfig);
        try {
            StreamEventWriter createStreamEventWriter = createStreamEventWriter(this.config, Boolean.valueOf(z));
            try {
                String apply = this.generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
                this.generatedTestConfig.setSupervisorId(this.indexer.submitSupervisor(apply));
                LOG.info("supervisorSpec: [%s]", new Object[]{apply});
                Interval of = Intervals.of("2000-01-01T01:01:00Z/2000-01-01T01:02:00Z");
                long generateData = generateData(of, createStreamEventWriter);
                Interval of2 = Intervals.of("2000-01-01T01:02:00Z/2000-01-01T01:03:00Z");
                long generateData2 = generateData(of2, createStreamEventWriter);
                Interval of3 = Intervals.of("2000-01-01T01:03:00Z/2000-01-01T01:04:00Z");
                long generateData3 = generateData(of3, createStreamEventWriter);
                ensureRowCount(generateData + generateData2 + generateData3);
                ensureLockedIntervals(new Interval[0]);
                ensureSegmentsLoaded();
                ensureSegmentsCount(6);
                long generateData4 = generateData2 + generateData(of2, createStreamEventWriter);
                ensureLockedIntervals(of2);
                submitAndVerifyCompactionConfig();
                this.compactionResource.forceTriggerAutoCompaction();
                ensureRowCount(generateData + generateData4 + generateData3);
                ensureLockedIntervals(new Interval[0]);
                ensureSegmentsLoaded();
                ensureCompactionTaskCount(2);
                verifyCompactedIntervals(of, of3);
                this.compactionResource.forceTriggerAutoCompaction();
                ensureCompactionTaskCount(3);
                ensureSegmentsLoaded();
                verifyCompactedIntervals(of, of2, of3);
                ensureSegmentsCount(3);
                if (createStreamEventWriter != null) {
                    createStreamEventWriter.close();
                }
                if (createResourceCloser != null) {
                    createResourceCloser.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createResourceCloser != null) {
                try {
                    createResourceCloser.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void ensureSegmentsCount(int i) {
        ITRetryUtil.retryUntilTrue(() -> {
            List fullSegmentsMetadata = this.coordinator.getFullSegmentsMetadata(this.fullDatasourceName);
            StringBuilder sb = new StringBuilder();
            fullSegmentsMetadata.forEach(dataSegment -> {
                sb.append("{").append(dataSegment.getId()).append(", ").append(dataSegment.getSize()).append("}, ");
            });
            LOG.info("Found Segments: %s", new Object[]{sb});
            LOG.info("Current metadata segment count: %d, expected: %d", new Object[]{Integer.valueOf(fullSegmentsMetadata.size()), Integer.valueOf(i)});
            return Boolean.valueOf(fullSegmentsMetadata.size() == i);
        }, "Segment count check");
    }

    private void verifyCompactedIntervals(Interval... intervalArr) {
        List<DataSegment> fullSegmentsMetadata = this.coordinator.getFullSegmentsMetadata(this.fullDatasourceName);
        ArrayList<DataSegment> arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (DataSegment dataSegment : fullSegmentsMetadata) {
            if (dataSegment.getLastCompactionState() != null) {
                arrayList.add(dataSegment);
                hashSet.add(dataSegment.getInterval());
            }
        }
        Assert.assertEquals(hashSet, new HashSet(Arrays.asList(intervalArr)));
        DynamicPartitionsSpec dynamicPartitionsSpec = new DynamicPartitionsSpec(10000, Long.MAX_VALUE);
        for (DataSegment dataSegment2 : arrayList) {
            Assert.assertNotNull(dataSegment2.getLastCompactionState());
            Assert.assertEquals(dataSegment2.getLastCompactionState().getPartitionsSpec(), dynamicPartitionsSpec);
        }
    }

    private long generateData(Interval interval, StreamEventWriter streamEventWriter) {
        long run = this.streamGenerator.run(this.generatedTestConfig.getStreamName(), streamEventWriter, 10, interval.getStart());
        LOG.info("Generated %d Rows for Interval [%s]", new Object[]{Long.valueOf(run), interval});
        return run;
    }

    private void ensureSegmentsLoaded() {
        ITRetryUtil.retryUntilTrue(() -> {
            return Boolean.valueOf(this.coordinator.areSegmentsLoaded(this.fullDatasourceName));
        }, "Segment Loading");
    }

    private void ensureLockedIntervals(Interval... intervalArr) {
        Map singletonMap = Collections.singletonMap(this.fullDatasourceName, 0);
        ArrayList arrayList = new ArrayList();
        ITRetryUtil.retryUntilTrue(() -> {
            arrayList.clear();
            Map lockedIntervals = this.indexer.getLockedIntervals(singletonMap);
            if (lockedIntervals.containsKey(this.fullDatasourceName)) {
                arrayList.addAll((Collection) lockedIntervals.get(this.fullDatasourceName));
            }
            LOG.info("Locked intervals: %s", new Object[]{arrayList});
            return Boolean.valueOf(intervalArr.length == arrayList.size());
        }, "Verify Locked Intervals");
        Assert.assertEquals(arrayList, Arrays.asList(intervalArr));
    }

    private boolean shouldSkipTest(boolean z) {
        return Boolean.parseBoolean((String) KafkaUtil.getAdditionalKafkaTestConfigFromProperties(this.config).getOrDefault("transactionEnabled", "false")) != z;
    }

    private void submitAndVerifyCompactionConfig() throws Exception {
        DataSourceCompactionConfig createCompactionConfig = CompactionUtil.createCompactionConfig(this.fullDatasourceName, 10000, Period.ZERO);
        this.compactionResource.updateCompactionTaskSlot(Double.valueOf(0.5d), 10, (Boolean) null);
        this.compactionResource.submitCompactionConfig(createCompactionConfig);
        Thread.sleep(2000L);
        DataSourceCompactionConfig dataSourceCompactionConfig = null;
        for (DataSourceCompactionConfig dataSourceCompactionConfig2 : this.compactionResource.getCoordinatorCompactionConfigs().getCompactionConfigs()) {
            if (dataSourceCompactionConfig2.getDataSource().equals(this.fullDatasourceName)) {
                dataSourceCompactionConfig = dataSourceCompactionConfig2;
            }
        }
        Assert.assertEquals(dataSourceCompactionConfig, createCompactionConfig);
        Assert.assertEquals(this.compactionResource.getDataSourceCompactionConfig(this.fullDatasourceName), createCompactionConfig);
    }

    private boolean isCompactionTask(TaskResponseObject taskResponseObject) {
        return "compact".equalsIgnoreCase(taskResponseObject.getType());
    }

    private void ensureCompactionTaskCount(int i) {
        LOG.info("Verifying compaction task count. Expected: %d", new Object[]{Integer.valueOf(i)});
        ITRetryUtil.retryUntilTrue(() -> {
            return Boolean.valueOf(getCompactionTaskCount() == ((long) i));
        }, "Compaction Task Count");
    }

    private long getCompactionTaskCount() {
        List<TaskResponseObject> uncompletedTasksForDataSource = this.indexer.getUncompletedTasksForDataSource(this.fullDatasourceName);
        List<TaskResponseObject> completeTasksForDataSource = this.indexer.getCompleteTasksForDataSource(this.fullDatasourceName);
        printTasks(uncompletedTasksForDataSource, "Incomplete");
        printTasks(completeTasksForDataSource, "Complete");
        return completeTasksForDataSource.stream().filter(this::isCompactionTask).count();
    }

    private void printTasks(List<TaskResponseObject> list, String str) {
        StringBuilder sb = new StringBuilder();
        list.forEach(taskResponseObject -> {
            sb.append("{").append(taskResponseObject.getType()).append(", ").append(taskResponseObject.getStatus()).append(", ").append(taskResponseObject.getCreatedTime()).append("}, ");
        });
        LOG.info("%s Tasks: %s", new Object[]{str, sb});
    }

    private void ensureRowCount(long j) {
        LOG.info("Verifying Row Count. Expected: %s", new Object[]{Long.valueOf(j)});
        ITRetryUtil.retryUntilTrue(() -> {
            return Boolean.valueOf(j == ((long) this.queryHelper.countRows(this.fullDatasourceName, Intervals.ETERNITY, str -> {
                return new LongSumAggregatorFactory(str, "count");
            })));
        }, StringUtils.format("dataSource[%s] consumed [%,d] events, expected [%,d]", new Object[]{this.fullDatasourceName, Integer.valueOf(this.queryHelper.countRows(this.fullDatasourceName, Intervals.ETERNITY, str -> {
            return new LongSumAggregatorFactory(str, "count");
        })), Long.valueOf(j)}));
    }
}
