package org.apache.druid.segment.realtime.appenderator;

import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.server.initialization.BaseJettyTest;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorTest.class */
public class ClosedSegmentsSinksBatchAppenderatorTest extends InitializedNullHandlingTest {
    private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(createSegmentId("2000/2001", "A", 0), createSegmentId("2000/2001", "A", 1), createSegmentId("2001/2002", "A", 0));

    @Test
    public void testSimpleIngestion() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(3, true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            Assert.assertNull(appenderator.startJob());
            Assert.assertEquals("foo", appenderator.getDataSource());
            Assert.assertEquals(1L, appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null).getNumRowsInSegment());
            Assert.assertEquals(1L, appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 2), (Supplier) null).getNumRowsInSegment());
            Assert.assertEquals(IDENTIFIERS.subList(0, 2), appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(2L, appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "sux", 1), (Supplier) null).getNumRowsInSegment());
            Assert.assertEquals(Collections.emptyList(), appenderator.getInMemorySegments().stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(1L, appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 4), (Supplier) null).getNumRowsInSegment());
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) appenderator.push(appenderator.getSegments(), (Committer) null, false).get();
            Assert.assertEquals(IDENTIFIERS.subList(0, 3), Lists.transform(segmentsAndCommitMetadata.getSegments(), SegmentIdWithShardSpec::fromDataSegment).stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(closedSegmensSinksBatchAppenderatorTester.getPushedSegments().stream().sorted().collect(Collectors.toList()), segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
            appenderator.close();
            Assert.assertTrue(appenderator.getSegments().isEmpty());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPeriodGranularityNonUTCIngestion() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(1, true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            Assert.assertNull(appenderator.startJob());
            Assert.assertEquals("foo", appenderator.getDataSource());
            SegmentIdWithShardSpec createNonUTCSegmentId = createNonUTCSegmentId("2021-06-27T00:00:00.000+09:00/2021-06-28T00:00:00.000+09:00", "A", 0);
            Assert.assertEquals(1L, appenderator.add(createNonUTCSegmentId, createInputRow("2021-06-27T00:01:11.080Z", "foo", 1), (Supplier) null).getNumRowsInSegment());
            Assert.assertEquals(Collections.singletonList(createNonUTCSegmentId), appenderator.getSegments().stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(Collections.emptyList(), appenderator.getInMemorySegments().stream().sorted().collect(Collectors.toList()));
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) appenderator.push(appenderator.getSegments(), (Committer) null, false).get();
            Assert.assertEquals(Collections.singletonList(createNonUTCSegmentId), Lists.transform(segmentsAndCommitMetadata.getSegments(), SegmentIdWithShardSpec::fromDataSegment).stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(closedSegmensSinksBatchAppenderatorTester.getPushedSegments().stream().sorted().collect(Collectors.toList()), segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
            appenderator.close();
            Assert.assertTrue(appenderator.getSegments().isEmpty());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(100, 1024L, null, true, new SimpleRowIngestionMeters(), true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            int i = NullHandling.sqlCompatible() ? 1 : 0;
            Assert.assertEquals(182 + i, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            Assert.assertEquals(182 + i, appenderator.getBytesInMemory(IDENTIFIERS.get(1)));
            appenderator.close();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(100, 1024L, null, true, new SimpleRowIngestionMeters(), true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            int i = NullHandling.sqlCompatible() ? 1 : 0;
            Assert.assertEquals(182 + i, appenderator.getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            Assert.assertEquals(364 + (2 * i), appenderator.getBytesCurrentlyInMemory());
            Assert.assertEquals(2L, appenderator.getSegments().size());
            appenderator.close();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testMaxBytesInMemory() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(100, 15000L, true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            int i = NullHandling.sqlCompatible() ? 1 : 0;
            Assert.assertEquals(182 + i, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(r0 + 5000, appenderator.getBytesCurrentlyInMemory());
            for (int i2 = 0; i2 < 53; i2++) {
                appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i2, 1), (Supplier) null);
            }
            Assert.assertEquals(0, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(0, appenderator.getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", BaseJettyTest.DummyAuthFilter.SECRET_USER, 1), (Supplier) null);
            Assert.assertEquals(182 + i, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(r0 + 5000, appenderator.getBytesCurrentlyInMemory());
            for (int i3 = 0; i3 < 53; i3++) {
                appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i3, 1), (Supplier) null);
            }
            Assert.assertEquals(0, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(0, appenderator.getBytesCurrentlyInMemory());
            appenderator.persistAll((Committer) null).get();
            appenderator.close();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            Assert.assertEquals(0L, appenderator.getBytesCurrentlyInMemory());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(expected = RuntimeException.class, timeout = 5000)
    public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(100, 5180L, true);
        try {
            Appenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(100, 10L, null, true, new SimpleRowIngestionMeters(), true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            Assert.assertEquals(0L, appenderator.getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            Assert.assertEquals(0L, appenderator.getBytesCurrentlyInMemory());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(100, 10000L, true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            Assert.assertEquals(182 + (NullHandling.sqlCompatible() ? 1 : 0) + 5000, appenderator.getBytesCurrentlyInMemory());
            appenderator.close();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            Assert.assertEquals(0L, appenderator.getBytesCurrentlyInMemory());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testMaxBytesInMemoryInMultipleSinks() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(1000, 28748L, true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            int i = NullHandling.sqlCompatible() ? 1 : 0;
            int i2 = 182 + i;
            Assert.assertEquals(i2, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(i2, appenderator.getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((2 * i2) + 10000, appenderator.getBytesCurrentlyInMemory());
            for (int i3 = 0; i3 < 49; i3++) {
                appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i3, 1), (Supplier) null);
                appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar_" + i3, 1), (Supplier) null);
            }
            Assert.assertEquals(0, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(0, appenderator.getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals(0, appenderator.getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", BaseJettyTest.DummyAuthFilter.SECRET_USER, 1), (Supplier) null);
            int i4 = 182 + i;
            Assert.assertEquals(i4, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(0L, appenderator.getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals(i4 + 5000, appenderator.getBytesCurrentlyInMemory());
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", BaseJettyTest.DummyAuthFilter.SECRET_USER, 1), (Supplier) null);
            Assert.assertEquals(i4, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(i4, appenderator.getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals((2 * i4) + 5000 + 5000, appenderator.getBytesCurrentlyInMemory());
            for (int i5 = 0; i5 < 49; i5++) {
                appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar_" + i5, 1), (Supplier) null);
                appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar_" + i5, 1), (Supplier) null);
            }
            Assert.assertEquals(0, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(0, appenderator.getBytesInMemory(IDENTIFIERS.get(1)));
            Assert.assertEquals(0, appenderator.getBytesCurrentlyInMemory());
            appenderator.close();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            Assert.assertEquals(0L, appenderator.getBytesCurrentlyInMemory());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testIgnoreMaxBytesInMemory() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(100, -1L, true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            int i = NullHandling.sqlCompatible() ? 1 : 0;
            Assert.assertEquals(182 + i, appenderator.getBytesInMemory(IDENTIFIERS.get(0)));
            Assert.assertEquals(1L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            Assert.assertEquals(364 + (2 * i) + 10000, appenderator.getBytesCurrentlyInMemory());
            Assert.assertEquals(2L, appenderator.getRowsInMemory());
            appenderator.close();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testMaxRowsInMemory() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(3, true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            Assert.assertEquals(1L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            Assert.assertEquals(2L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            Assert.assertEquals(2L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), (Supplier) null);
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "baz", 1), (Supplier) null);
            Assert.assertEquals(1L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 1), (Supplier) null);
            Assert.assertEquals(2L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", BaseJettyTest.DummyAuthFilter.SECRET_USER, 1), (Supplier) null);
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.close();
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testAllHydrantsAreRecovered() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(1, false);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo2", 1), (Supplier) null);
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo3", 1), (Supplier) null);
            Assert.assertEquals(IDENTIFIERS.subList(0, 1), Lists.transform(((SegmentsAndCommitMetadata) appenderator.push(appenderator.getSegments(), (Committer) null, false).get()).getSegments(), SegmentIdWithShardSpec::fromDataSegment).stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.close();
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testTotalRowsPerSegment() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(3, true);
        try {
            BatchAppenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            appenderator.startJob();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            Appenderator.AppenderatorAddResult add = appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            Assert.assertEquals(1L, appenderator.getRowsInMemory());
            Assert.assertEquals(1L, add.getNumRowsInSegment());
            Appenderator.AppenderatorAddResult add2 = appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            Assert.assertEquals(2L, appenderator.getRowsInMemory());
            Assert.assertEquals(1L, add2.getNumRowsInSegment());
            Appenderator.AppenderatorAddResult add3 = appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            Assert.assertEquals(2L, appenderator.getRowsInMemory());
            Assert.assertEquals(1L, add3.getNumRowsInSegment());
            Appenderator.AppenderatorAddResult add4 = appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), (Supplier) null);
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            Assert.assertEquals(2L, add4.getNumRowsInSegment());
            Appenderator.AppenderatorAddResult add5 = appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bat", 1), (Supplier) null);
            Assert.assertEquals(1L, appenderator.getRowsInMemory());
            Assert.assertEquals(3L, add5.getNumRowsInSegment());
            Appenderator.AppenderatorAddResult add6 = appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "baz", 1), (Supplier) null);
            Assert.assertEquals(2L, appenderator.getRowsInMemory());
            Assert.assertEquals(2L, add6.getNumRowsInSegment());
            Appenderator.AppenderatorAddResult add7 = appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 1), (Supplier) null);
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            Assert.assertEquals(4L, add7.getNumRowsInSegment());
            Appenderator.AppenderatorAddResult add8 = appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", BaseJettyTest.DummyAuthFilter.SECRET_USER, 1), (Supplier) null);
            Assert.assertEquals(1L, appenderator.getRowsInMemory());
            Assert.assertEquals(3L, add8.getNumRowsInSegment());
            appenderator.close();
            Assert.assertEquals(0L, appenderator.getRowsInMemory());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testRestoreFromDisk() throws Exception {
        BatchAppenderator appenderator = new ClosedSegmensSinksBatchAppenderatorTester(2, true).getAppenderator();
        appenderator.startJob();
        appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
        appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), (Supplier) null);
        Assert.assertEquals(0L, appenderator.getRowsInMemory());
        appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "baz", 3), (Supplier) null);
        appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), (Supplier) null);
        Assert.assertEquals(0L, appenderator.getRowsInMemory());
        appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", BaseJettyTest.DummyAuthFilter.SECRET_USER, 5), (Supplier) null);
        Assert.assertEquals(1L, appenderator.getRowsInMemory());
        appenderator.persistAll((Committer) null).get();
        Assert.assertEquals(0L, appenderator.getRowsInMemory());
        Assert.assertNotNull(appenderator.getPersistedidentifierPaths());
        Assert.assertEquals(3L, r0.size());
        appenderator.push(IDENTIFIERS, (Committer) null, false).get();
        Assert.assertNotNull(appenderator.getPersistedidentifierPaths());
        Assert.assertEquals(0L, r0.size());
        appenderator.close();
    }

    @Test
    public void testCleanupFromDiskAfterClose() throws Exception {
        BatchAppenderator appenderator = new ClosedSegmensSinksBatchAppenderatorTester(2, true).getAppenderator();
        appenderator.startJob();
        appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
        appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), (Supplier) null);
        Assert.assertEquals(0L, appenderator.getRowsInMemory());
        Assert.assertEquals(2L, appenderator.getTotalRowCount());
        appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "baz", 3), (Supplier) null);
        appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), (Supplier) null);
        Assert.assertEquals(0L, appenderator.getRowsInMemory());
        Assert.assertEquals(4L, appenderator.getTotalRowCount());
        appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", BaseJettyTest.DummyAuthFilter.SECRET_USER, 5), (Supplier) null);
        Assert.assertEquals(1L, appenderator.getRowsInMemory());
        appenderator.persistAll((Committer) null).get();
        Assert.assertEquals(0L, appenderator.getRowsInMemory());
        Assert.assertEquals(5L, appenderator.getTotalRowCount());
        Assert.assertNotNull(appenderator.getPersistedidentifierPaths());
        Assert.assertEquals(3L, r0.size());
        appenderator.close();
        Assert.assertNotNull(appenderator.getPersistedidentifierPaths());
        Assert.assertEquals(0L, r0.size());
        Assert.assertEquals(0L, appenderator.getRowsInMemory());
        Assert.assertEquals(0L, appenderator.getTotalRowCount());
    }

    @Test(timeout = 5000)
    public void testTotalRowCount() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(3, true);
        try {
            Appenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            Assert.assertEquals(0L, appenderator.getTotalRowCount());
            appenderator.startJob();
            Assert.assertEquals(0L, appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            Assert.assertEquals(1L, appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar", 1), (Supplier) null);
            Assert.assertEquals(2L, appenderator.getTotalRowCount());
            appenderator.persistAll((Committer) null).get();
            Assert.assertEquals(2L, appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(0)).get();
            Assert.assertEquals(1L, appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(1)).get();
            Assert.assertEquals(0L, appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "bar", 1), (Supplier) null);
            Assert.assertEquals(1L, appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "baz", 1), (Supplier) null);
            Assert.assertEquals(2L, appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", "qux", 1), (Supplier) null);
            Assert.assertEquals(3L, appenderator.getTotalRowCount());
            appenderator.add(IDENTIFIERS.get(2), createInputRow("2001", BaseJettyTest.DummyAuthFilter.SECRET_USER, 1), (Supplier) null);
            Assert.assertEquals(4L, appenderator.getTotalRowCount());
            appenderator.persistAll((Committer) null).get();
            Assert.assertEquals(4L, appenderator.getTotalRowCount());
            appenderator.drop(IDENTIFIERS.get(2)).get();
            Assert.assertEquals(0L, appenderator.getTotalRowCount());
            appenderator.close();
            Assert.assertEquals(0L, appenderator.getTotalRowCount());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testVerifyRowIngestionMetrics() throws Exception {
        SimpleRowIngestionMeters simpleRowIngestionMeters = new SimpleRowIngestionMeters();
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(5, 10000L, null, false, simpleRowIngestionMeters);
        try {
            Appenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", "invalid_met"), (Supplier) null);
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), (Supplier) null);
            Assert.assertEquals(1L, simpleRowIngestionMeters.getProcessed());
            Assert.assertEquals(1L, simpleRowIngestionMeters.getProcessedWithError());
            Assert.assertEquals(0L, simpleRowIngestionMeters.getUnparseable());
            Assert.assertEquals(0L, simpleRowIngestionMeters.getThrownAway());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(timeout = 10000)
    public void testPushContract() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(1, 50000L, null, false, new SimpleRowIngestionMeters());
        try {
            Appenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 1), (Supplier) null);
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar2", 1), (Supplier) null);
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar3", 1), (Supplier) null);
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) appenderator.push(Collections.singletonList(IDENTIFIERS.get(0)), (Committer) null, false).get();
            Assert.assertEquals(Collections.singletonList(IDENTIFIERS.get(0)), Lists.transform(segmentsAndCommitMetadata.getSegments(), SegmentIdWithShardSpec::fromDataSegment).stream().sorted().collect(Collectors.toList()));
            Assert.assertEquals(closedSegmensSinksBatchAppenderatorTester.getPushedSegments().stream().sorted().collect(Collectors.toList()), segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));
            appenderator.drop(IDENTIFIERS.get(0));
            Assert.assertEquals(Collections.singletonList(IDENTIFIERS.get(1)), appenderator.getSegments());
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testCloseContract() throws Exception {
        ClosedSegmensSinksBatchAppenderatorTester closedSegmensSinksBatchAppenderatorTester = new ClosedSegmensSinksBatchAppenderatorTester(1, 50000L, null, false, new SimpleRowIngestionMeters());
        try {
            Appenderator appenderator = closedSegmensSinksBatchAppenderatorTester.getAppenderator();
            appenderator.startJob();
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 1), (Supplier) null);
            appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar2", 1), (Supplier) null);
            ListenableFuture push = appenderator.push(Collections.singletonList(IDENTIFIERS.get(0)), (Committer) null, false);
            appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "bar3", 1), (Supplier) null);
            ListenableFuture push2 = appenderator.push(Collections.singletonList(IDENTIFIERS.get(1)), (Committer) null, false);
            appenderator.close();
            Assert.assertTrue(!push.isCancelled());
            Assert.assertTrue(!push2.isCancelled());
            Assert.assertTrue(push.isDone());
            Assert.assertTrue(push2.isDone());
            SegmentsAndCommitMetadata segmentsAndCommitMetadata = (SegmentsAndCommitMetadata) push.get();
            SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = (SegmentsAndCommitMetadata) push2.get();
            Assert.assertEquals(segmentsAndCommitMetadata.getSegments().size(), 1L);
            Assert.assertEquals(segmentsAndCommitMetadata2.getSegments().size(), 1L);
            closedSegmensSinksBatchAppenderatorTester.close();
        } catch (Throwable th) {
            try {
                closedSegmensSinksBatchAppenderatorTester.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static SegmentIdWithShardSpec createNonUTCSegmentId(String str, String str2, int i) {
        return new SegmentIdWithShardSpec("foo", new Interval(str, ISOChronology.getInstance(DateTimes.inferTzFromString("Asia/Seoul"))), str2, new LinearShardSpec(Integer.valueOf(i)));
    }

    private static SegmentIdWithShardSpec createSegmentId(String str, String str2, int i) {
        return new SegmentIdWithShardSpec("foo", Intervals.of(str), str2, new LinearShardSpec(Integer.valueOf(i)));
    }

    static InputRow createInputRow(String str, String str2, Object obj) {
        return new MapBasedInputRow(DateTimes.of(str).getMillis(), ImmutableList.of("dim"), ImmutableMap.of("dim", str2, "met", obj));
    }
}
