package org.apache.druid.segment.realtime.plumber;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.search.AutoStrategy;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireDepartmentTest;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.skife.jdbi.org.antlr.runtime.debug.DebugEventListener;
import org.skife.jdbi.org.antlr.runtime.debug.Profiler;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.class */
public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest {
    private final RejectionPolicyFactory rejectionPolicy;
    private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
    private RealtimePlumber plumber;
    private RealtimePlumberSchool realtimePlumberSchool;
    private DataSegmentAnnouncer announcer;
    private SegmentPublisher segmentPublisher;
    private DataSegmentPusher dataSegmentPusher;
    private SegmentHandoffNotifier handoffNotifier;
    private SegmentHandoffNotifierFactory handoffNotifierFactory;
    private ServiceEmitter emitter;
    private RealtimeTuningConfig tuningConfig;
    private DataSchema schema;
    private DataSchema schema2;
    private FireDepartmentMetrics metrics;
    private File tmpDir;

    @Parameterized.Parameters(name = "rejectionPolicy = {0}, segmentWriteOutMediumFactory = {1}")
    public static Collection<?> constructorFeeder() {
        RejectionPolicyFactory[] rejectionPolicyFactoryArr = {new NoopRejectionPolicyFactory(), new MessageTimeRejectionPolicyFactory()};
        ArrayList arrayList = new ArrayList();
        for (RejectionPolicyFactory rejectionPolicyFactory : rejectionPolicyFactoryArr) {
            arrayList.add(new Object[]{rejectionPolicyFactory, OffHeapMemorySegmentWriteOutMediumFactory.instance()});
            arrayList.add(new Object[]{rejectionPolicyFactory, TmpFileSegmentWriteOutMediumFactory.instance()});
        }
        return arrayList;
    }

    public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicyFactory, SegmentWriteOutMediumFactory segmentWriteOutMediumFactory) {
        this.rejectionPolicy = rejectionPolicyFactory;
        this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
    }

    @Before
    public void setUp() throws Exception {
        this.tmpDir = FileUtils.createTempDir();
        DefaultObjectMapper defaultObjectMapper = new DefaultObjectMapper();
        this.schema = new DataSchema("test", (Map<String, Object>) defaultObjectMapper.convertValue(new StringInputRowParser(new JSONParseSpec(new TimestampSpec(TimestampSpec.DEFAULT_COLUMN, AutoStrategy.NAME, null), new DimensionsSpec(null, null, null), null, null, null), null), Map.class), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), (TransformSpec) null, defaultObjectMapper);
        this.schema2 = new DataSchema("test", (Map<String, Object>) defaultObjectMapper.convertValue(new StringInputRowParser(new JSONParseSpec(new TimestampSpec(TimestampSpec.DEFAULT_COLUMN, AutoStrategy.NAME, null), new DimensionsSpec(null, null, null), null, null, null), null), Map.class), new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularities.YEAR, Granularities.NONE, null), (TransformSpec) null, defaultObjectMapper);
        this.announcer = (DataSegmentAnnouncer) EasyMock.createMock(DataSegmentAnnouncer.class);
        this.announcer.announceSegment((DataSegment) EasyMock.anyObject());
        EasyMock.expectLastCall().anyTimes();
        this.segmentPublisher = (SegmentPublisher) EasyMock.createNiceMock(SegmentPublisher.class);
        this.dataSegmentPusher = (DataSegmentPusher) EasyMock.createNiceMock(DataSegmentPusher.class);
        this.handoffNotifierFactory = (SegmentHandoffNotifierFactory) EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
        this.handoffNotifier = (SegmentHandoffNotifier) EasyMock.createNiceMock(SegmentHandoffNotifier.class);
        EasyMock.expect(this.handoffNotifierFactory.createSegmentHandoffNotifier(EasyMock.anyString())).andReturn(this.handoffNotifier).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.handoffNotifier.registerSegmentHandoffCallback((SegmentDescriptor) EasyMock.anyObject(), (Executor) EasyMock.anyObject(), (Runnable) EasyMock.anyObject()))).andReturn(true).anyTimes();
        this.emitter = (ServiceEmitter) EasyMock.createMock(ServiceEmitter.class);
        EasyMock.replay(this.announcer, this.segmentPublisher, this.dataSegmentPusher, this.handoffNotifierFactory, this.handoffNotifier, this.emitter);
        this.tuningConfig = new RealtimeTuningConfig(null, 1, null, null, null, null, new IntervalStartVersioningPolicy(), this.rejectionPolicy, null, null, null, null, true, 0, 0, false, null, null, null, null);
        this.realtimePlumberSchool = new RealtimePlumberSchool(this.emitter, new DefaultQueryRunnerFactoryConglomerate(new HashMap()), this.dataSegmentPusher, this.announcer, this.segmentPublisher, this.handoffNotifierFactory, Execs.directExecutor(), NoopJoinableFactory.INSTANCE, TestHelper.getTestIndexMergerV9(this.segmentWriteOutMediumFactory), TestHelper.getTestIndexIO(), MapCache.create(0L), FireDepartmentTest.NO_CACHE_CONFIG, new CachePopulatorStats(), TestHelper.makeJsonMapper());
        this.metrics = new FireDepartmentMetrics();
        this.plumber = (RealtimePlumber) this.realtimePlumberSchool.findPlumber(this.schema, this.tuningConfig, this.metrics);
    }

    @After
    public void tearDown() throws Exception {
        EasyMock.verify(this.announcer, this.segmentPublisher, this.dataSegmentPusher, this.handoffNotifierFactory, this.handoffNotifier, this.emitter);
        FileUtils.deleteDirectory(new File(this.tuningConfig.getBasePersistDirectory(), this.schema.getDataSource()));
        FileUtils.deleteDirectory(this.tmpDir);
    }

    @Test(timeout = 60000)
    public void testPersist() throws Exception {
        testPersist(null);
    }

    @Test(timeout = 60000)
    public void testPersistWithCommitMetadata() throws Exception {
        testPersist("dummyCommitMetadata");
        this.plumber = (RealtimePlumber) this.realtimePlumberSchool.findPlumber(this.schema, this.tuningConfig, this.metrics);
        Assert.assertEquals("dummyCommitMetadata", this.plumber.startJob());
    }

    private void testPersist(final Object obj) throws Exception {
        this.plumber.getSinks().put(0L, new Sink(Intervals.utc(0L, TimeUnit.HOURS.toMillis(1L)), this.schema, this.tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.getMaxBytesInMemoryOrDefault(), this.tuningConfig.getDedupColumn()));
        Assert.assertNull(this.plumber.startJob());
        InputRow inputRow = (InputRow) EasyMock.createNiceMock(InputRow.class);
        EasyMock.expect(Long.valueOf(inputRow.getTimestampFromEpoch())).andReturn(0L);
        EasyMock.expect(inputRow.getDimensions()).andReturn(new ArrayList());
        EasyMock.replay(inputRow);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Committer committer = new Committer() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.1
            @Override // org.apache.druid.data.input.Committer
            public Object getMetadata() {
                return obj;
            }

            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        };
        this.plumber.add(inputRow, Suppliers.ofInstance(committer));
        this.plumber.persist(committer);
        countDownLatch.await();
        this.plumber.getSinks().clear();
        this.plumber.finishJob();
    }

    @Test(timeout = 60000)
    public void testPersistFails() throws Exception {
        this.plumber.getSinks().put(0L, new Sink(Intervals.utc(0L, TimeUnit.HOURS.toMillis(1L)), this.schema, this.tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.getMaxBytesInMemoryOrDefault(), this.tuningConfig.getDedupColumn()));
        this.plumber.startJob();
        InputRow inputRow = (InputRow) EasyMock.createNiceMock(InputRow.class);
        EasyMock.expect(Long.valueOf(inputRow.getTimestampFromEpoch())).andReturn(0L);
        EasyMock.expect(inputRow.getDimensions()).andReturn(new ArrayList());
        EasyMock.replay(inputRow);
        this.plumber.add(inputRow, Suppliers.ofInstance(Committers.nil()));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.plumber.persist(supplierFromRunnable(() -> {
            countDownLatch.countDown();
            throw new RuntimeException();
        }).get2());
        countDownLatch.await();
        while (this.metrics.failedPersists() < 1) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(1L, this.metrics.failedPersists());
    }

    @Test(timeout = 60000)
    public void testPersistHydrantGaps() throws Exception {
        testPersistHydrantGapsHelper("dummyCommitMetadata");
    }

    private void testPersistHydrantGapsHelper(final Object obj) throws Exception {
        Interval interval = new Interval(DateTimes.of("1970-01-01"), DateTimes.of("1971-01-01"));
        RealtimePlumber realtimePlumber = (RealtimePlumber) this.realtimePlumberSchool.findPlumber(this.schema2, this.tuningConfig, this.metrics);
        realtimePlumber.getSinks().put(0L, new Sink(interval, this.schema2, this.tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), this.tuningConfig.getAppendableIndexSpec(), this.tuningConfig.getMaxRowsInMemory(), this.tuningConfig.getMaxBytesInMemoryOrDefault(), this.tuningConfig.getDedupColumn()));
        Assert.assertNull(realtimePlumber.startJob());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Committer committer = new Committer() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.2
            @Override // org.apache.druid.data.input.Committer
            public Object getMetadata() {
                return obj;
            }

            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        };
        realtimePlumber.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRow("1970-02-01"), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRow("1970-03-01"), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRow("1970-04-01"), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRow("1970-05-01"), Suppliers.ofInstance(committer));
        realtimePlumber.persist(committer);
        countDownLatch.await();
        realtimePlumber.getSinks().clear();
        realtimePlumber.finishJob();
        File computePersistDir = realtimePlumber.computePersistDir(this.schema2, interval);
        for (int i = 0; i < 5; i++) {
            Assert.assertTrue(new File(computePersistDir, String.valueOf(i)).exists());
        }
        FileUtils.deleteDirectory(new File(computePersistDir, "1"));
        FileUtils.deleteDirectory(new File(computePersistDir, Profiler.Version));
        RealtimePlumber realtimePlumber2 = (RealtimePlumber) this.realtimePlumberSchool.findPlumber(this.schema2, this.tuningConfig, this.metrics);
        realtimePlumber2.bootstrapSinksFromDisk();
        Map<Long, Sink> sinks = realtimePlumber2.getSinks();
        Assert.assertEquals(1L, sinks.size());
        ArrayList newArrayList = Lists.newArrayList(sinks.get(new Long(0L)));
        Interval interval2 = new Interval(DateTimes.of("1970-01-01T00:00:00.000Z"), DateTimes.of("1971-01-01T00:00:00.000Z"));
        Assert.assertEquals(0L, ((FireHydrant) newArrayList.get(0)).getCount());
        Assert.assertEquals(interval2, ((FireHydrant) newArrayList.get(0)).getSegmentDataInterval());
        Assert.assertEquals(2L, ((FireHydrant) newArrayList.get(1)).getCount());
        Assert.assertEquals(interval2, ((FireHydrant) newArrayList.get(1)).getSegmentDataInterval());
        Assert.assertEquals(4L, ((FireHydrant) newArrayList.get(2)).getCount());
        Assert.assertEquals(interval2, ((FireHydrant) newArrayList.get(2)).getSegmentDataInterval());
        FileUtils.deleteDirectory(new File(computePersistDir, "0"));
        FileUtils.deleteDirectory(new File(computePersistDir, DebugEventListener.PROTOCOL_VERSION));
        FileUtils.deleteDirectory(new File(computePersistDir, "4"));
        ((RealtimePlumber) this.realtimePlumberSchool.findPlumber(this.schema2, this.tuningConfig, this.metrics)).bootstrapSinksFromDisk();
        Assert.assertEquals(0L, r0.getSinks().size());
    }

    @Test(timeout = 60000)
    public void testDimOrderInheritance() throws Exception {
        testDimOrderInheritanceHelper("dummyCommitMetadata");
    }

    private void testDimOrderInheritanceHelper(final Object obj) throws Exception {
        ImmutableList of = ImmutableList.of(ImmutableList.of("dimD"), ImmutableList.of("dimC"), ImmutableList.of("dimA"), ImmutableList.of("dimB"), ImmutableList.of("dimE"), ImmutableList.of("dimD", "dimC", "dimA", "dimB", "dimE"));
        RealtimePlumber realtimePlumber = (RealtimePlumber) this.realtimePlumberSchool.findPlumber(this.schema2, this.tuningConfig, this.metrics);
        Assert.assertNull(realtimePlumber.startJob());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Committer committer = new Committer() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.3
            @Override // org.apache.druid.data.input.Committer
            public Object getMetadata() {
                return obj;
            }

            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
            }
        };
        realtimePlumber.add(getTestInputRowFull("1970-01-01", ImmutableList.of("dimD"), ImmutableList.of("1")), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRowFull("1970-01-01", ImmutableList.of("dimC"), ImmutableList.of("1")), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRowFull("1970-01-01", ImmutableList.of("dimA"), ImmutableList.of("1")), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRowFull("1970-01-01", ImmutableList.of("dimB"), ImmutableList.of("1")), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRowFull("1970-01-01", ImmutableList.of("dimE"), ImmutableList.of("1")), Suppliers.ofInstance(committer));
        realtimePlumber.add(getTestInputRowFull("1970-01-01", ImmutableList.of("dimA", "dimB", "dimC", "dimD", "dimE"), ImmutableList.of("1")), Suppliers.ofInstance(committer));
        realtimePlumber.persist(committer);
        countDownLatch.await();
        realtimePlumber.getSinks().clear();
        realtimePlumber.finishJob();
        RealtimePlumber realtimePlumber2 = (RealtimePlumber) this.realtimePlumberSchool.findPlumber(this.schema2, this.tuningConfig, this.metrics);
        realtimePlumber2.bootstrapSinksFromDisk();
        Map<Long, Sink> sinks = realtimePlumber2.getSinks();
        Assert.assertEquals(1L, sinks.size());
        ArrayList newArrayList = Lists.newArrayList(sinks.get(0L));
        for (int i = 0; i < newArrayList.size(); i++) {
            ReferenceCountingSegment incrementedSegment = ((FireHydrant) newArrayList.get(i)).getIncrementedSegment();
            try {
                QueryableIndex asQueryableIndex = incrementedSegment.asQueryableIndex();
                Assert.assertEquals(i, r0.getCount());
                Assert.assertEquals(of.get(i), ImmutableList.copyOf(asQueryableIndex.getAvailableDimensions()));
                incrementedSegment.decrement();
            } catch (Throwable th) {
                incrementedSegment.decrement();
                throw th;
            }
        }
    }

    private InputRow getTestInputRow(final String str) {
        return new InputRow() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.4
            @Override // org.apache.druid.data.input.InputRow
            public List<String> getDimensions() {
                return new ArrayList();
            }

            @Override // org.apache.druid.data.input.Row
            public long getTimestampFromEpoch() {
                return DateTimes.of(str).getMillis();
            }

            @Override // org.apache.druid.data.input.Row
            public DateTime getTimestamp() {
                return DateTimes.of(str);
            }

            @Override // org.apache.druid.data.input.Row
            public List<String> getDimension(String str2) {
                return new ArrayList();
            }

            @Override // org.apache.druid.data.input.Row
            public Number getMetric(String str2) {
                return 0;
            }

            @Override // org.apache.druid.data.input.Row
            public Object getRaw(String str2) {
                return null;
            }

            @Override // java.lang.Comparable
            public int compareTo(Row row) {
                return 0;
            }
        };
    }

    private InputRow getTestInputRowFull(final String str, final List<String> list, final List<String> list2) {
        return new InputRow() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.5
            @Override // org.apache.druid.data.input.InputRow
            public List<String> getDimensions() {
                return list;
            }

            @Override // org.apache.druid.data.input.Row
            public long getTimestampFromEpoch() {
                return DateTimes.of(str).getMillis();
            }

            @Override // org.apache.druid.data.input.Row
            public DateTime getTimestamp() {
                return DateTimes.of(str);
            }

            @Override // org.apache.druid.data.input.Row
            public List<String> getDimension(String str2) {
                return list2;
            }

            @Override // org.apache.druid.data.input.Row
            public Number getMetric(String str2) {
                return 0;
            }

            @Override // org.apache.druid.data.input.Row
            public Object getRaw(String str2) {
                return list2;
            }

            @Override // java.lang.Comparable
            public int compareTo(Row row) {
                return 0;
            }
        };
    }

    private static Supplier<Committer> supplierFromRunnable(final Runnable runnable) {
        return Suppliers.ofInstance(new Committer() { // from class: org.apache.druid.segment.realtime.plumber.RealtimePlumberSchoolTest.6
            @Override // org.apache.druid.data.input.Committer
            public Object getMetadata() {
                return null;
            }

            @Override // java.lang.Runnable
            public void run() {
                runnable.run();
            }
        });
    }
}
