package org.apache.druid.segment.realtime.appenderator;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.class */
public class StreamAppenderatorDriverTest extends EasyMockSupport {
    private static final String DATA_SOURCE = "foo";
    private static final String VERSION = "abc123";
    private static final int MAX_ROWS_IN_MEMORY = 100;
    private static final int MAX_ROWS_PER_SEGMENT = 3;
    private SegmentAllocator allocator;
    private AppenderatorTester appenderatorTester;
    private TestSegmentHandoffNotifierFactory segmentHandoffNotifierFactory;
    private StreamAppenderatorDriver driver;
    private DataSegmentKiller dataSegmentKiller;
    private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
    private static final long PUBLISH_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
    private static final long HANDOFF_CONDITION_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(1);
    private static final List<InputRow> ROWS = Arrays.asList(new MapBasedInputRow(DateTimes.of("2000"), ImmutableList.of("dim1"), ImmutableMap.of("dim1", "foo", "met1", "1")), new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim1"), ImmutableMap.of("dim1", (Double) "foo", "met1", Double.valueOf(2.0d))), new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim2"), ImmutableMap.of("dim2", (Double) "bar", "met1", Double.valueOf(2.0d))));

    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestCommitterSupplier.class */
    static class TestCommitterSupplier<T> implements Supplier<Committer> {
        private final AtomicReference<T> metadata = new AtomicReference<>();

        public void setMetadata(T t) {
            this.metadata.set(t);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.google.common.base.Supplier
        /* renamed from: get */
        public Committer get2() {
            final T t = this.metadata.get();
            return new Committer() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestCommitterSupplier.1
                @Override // org.apache.druid.data.input.Committer
                public Object getMetadata() {
                    return t;
                }

                @Override // java.lang.Runnable
                public void run() {
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestSegmentAllocator.class */
    static class TestSegmentAllocator implements SegmentAllocator {
        private final String dataSource;
        private final Granularity granularity;
        private final Map<Long, AtomicInteger> counters = new HashMap();

        public TestSegmentAllocator(String str, Granularity granularity) {
            this.dataSource = str;
            this.granularity = granularity;
        }

        @Override // org.apache.druid.segment.realtime.appenderator.SegmentAllocator
        public SegmentIdWithShardSpec allocate(InputRow inputRow, String str, String str2, boolean z) {
            SegmentIdWithShardSpec segmentIdWithShardSpec;
            synchronized (this.counters) {
                DateTime bucketStart = this.granularity.bucketStart(inputRow.getTimestamp());
                long millis = bucketStart.getMillis();
                this.counters.putIfAbsent(Long.valueOf(millis), new AtomicInteger());
                segmentIdWithShardSpec = new SegmentIdWithShardSpec(this.dataSource, this.granularity.bucket(bucketStart), StreamAppenderatorDriverTest.VERSION, new NumberedShardSpec(this.counters.get(Long.valueOf(millis)).getAndIncrement(), 0));
            }
            return segmentIdWithShardSpec;
        }
    }

    /* loaded from: input_file:org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest$TestSegmentHandoffNotifierFactory.class */
    static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory {
        private boolean handoffEnabled = true;
        private long handoffDelay;

        public void disableHandoff() {
            this.handoffEnabled = false;
        }

        public void setHandoffDelay(long j) {
            this.handoffDelay = j;
        }

        @Override // org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory
        public SegmentHandoffNotifier createSegmentHandoffNotifier(String str) {
            return new SegmentHandoffNotifier() { // from class: org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriverTest.TestSegmentHandoffNotifierFactory.1
                @Override // org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier
                public boolean registerSegmentHandoffCallback(SegmentDescriptor segmentDescriptor, Executor executor, Runnable runnable) {
                    if (!TestSegmentHandoffNotifierFactory.this.handoffEnabled) {
                        return true;
                    }
                    if (TestSegmentHandoffNotifierFactory.this.handoffDelay > 0) {
                        try {
                            Thread.sleep(TestSegmentHandoffNotifierFactory.this.handoffDelay);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    executor.execute(runnable);
                    return true;
                }

                @Override // org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier
                public void start() {
                }

                @Override // org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier, java.io.Closeable, java.lang.AutoCloseable
                public void close() {
                }
            };
        }
    }

    @Before
    public void setUp() {
        this.appenderatorTester = new AppenderatorTester(100);
        this.allocator = new TestSegmentAllocator("foo", Granularities.HOUR);
        this.segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
        this.dataSegmentKiller = (DataSegmentKiller) createStrictMock(DataSegmentKiller.class);
        this.driver = new StreamAppenderatorDriver(this.appenderatorTester.getAppenderator(), this.allocator, this.segmentHandoffNotifierFactory, new TestUsedSegmentChecker(this.appenderatorTester), this.dataSegmentKiller, OBJECT_MAPPER, new FireDepartmentMetrics());
        EasyMock.replay(this.dataSegmentKiller);
    }

    @After
    public void tearDown() throws Exception {
        EasyMock.verify(this.dataSegmentKiller);
        this.driver.clear();
        this.driver.close();
    }

    @Test(timeout = 60000)
    public void testSimple() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob(null));
        for (int i = 0; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), BaseCalciteQueryTest.DUMMY_SQL_ID, testCommitterSupplier, false, true).isOk());
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = this.driver.publish(makeOkPublisher(), testCommitterSupplier.get2(), ImmutableList.of(BaseCalciteQueryTest.DUMMY_SQL_ID)).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey(BaseCalciteQueryTest.DUMMY_SQL_ID)) {
            Thread.sleep(100L);
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = this.driver.registerHandoff(segmentsAndCommitMetadata).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0)), new SegmentIdWithShardSpec("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndCommitMetadata2.getSegments()));
        Assert.assertEquals((Object) 3, segmentsAndCommitMetadata2.getCommitMetadata());
    }

    @Test
    public void testMaxRowsPerSegment() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob(null));
        for (int i = 0; i < 9; i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            AppenderatorDriverAddResult add = this.driver.add(new MapBasedInputRow(DateTimes.of("2000T01"), ImmutableList.of("dim2"), ImmutableMap.of("dim2", (Double) StringUtils.format("bar-%d", Integer.valueOf(i)), "met1", Double.valueOf(2.0d))), BaseCalciteQueryTest.DUMMY_SQL_ID, testCommitterSupplier, false, true);
            Assert.assertTrue(add.isOk());
            if (add.getNumRowsInSegment() > 3) {
                this.driver.moveSegmentOut(BaseCalciteQueryTest.DUMMY_SQL_ID, ImmutableList.of(add.getSegmentIdentifier()));
            }
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = this.driver.publish(makeOkPublisher(), testCommitterSupplier.get2(), ImmutableList.of(BaseCalciteQueryTest.DUMMY_SQL_ID)).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey(BaseCalciteQueryTest.DUMMY_SQL_ID)) {
            Thread.sleep(100L);
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = this.driver.registerHandoff(segmentsAndCommitMetadata).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(3L, segmentsAndCommitMetadata2.getSegments().size());
        Assert.assertEquals((Object) 9, segmentsAndCommitMetadata2.getCommitMetadata());
    }

    @Test(timeout = 60000, expected = TimeoutException.class)
    public void testHandoffTimeout() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        this.segmentHandoffNotifierFactory.disableHandoff();
        Assert.assertNull(this.driver.startJob(null));
        for (int i = 0; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), BaseCalciteQueryTest.DUMMY_SQL_ID, testCommitterSupplier, false, true).isOk());
        }
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = this.driver.publish(makeOkPublisher(), testCommitterSupplier.get2(), ImmutableList.of(BaseCalciteQueryTest.DUMMY_SQL_ID)).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        while (this.driver.getSegments().containsKey(BaseCalciteQueryTest.DUMMY_SQL_ID)) {
            Thread.sleep(100L);
        }
        this.driver.registerHandoff(segmentsAndCommitMetadata).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob(null));
        testCommitterSupplier.setMetadata(1);
        Assert.assertTrue(this.driver.add(ROWS.get(0), BaseCalciteQueryTest.DUMMY_SQL_ID, testCommitterSupplier, false, true).isOk());
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.get2(), ImmutableList.of(BaseCalciteQueryTest.DUMMY_SQL_ID)).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndCommitMetadata.getSegments()));
        Assert.assertEquals((Object) 1, segmentsAndCommitMetadata.getCommitMetadata());
        for (int i = 1; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), BaseCalciteQueryTest.DUMMY_SQL_ID, testCommitterSupplier, false, true).isOk());
            SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.get2(), ImmutableList.of(BaseCalciteQueryTest.DUMMY_SQL_ID)).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(i - 1, 0))), asIdentifiers(segmentsAndCommitMetadata2.getSegments()));
            Assert.assertEquals(Integer.valueOf(i + 1), segmentsAndCommitMetadata2.getCommitMetadata());
        }
        this.driver.persist(testCommitterSupplier.get2());
        SegmentsAndCommitMetadata segmentsAndCommitMetadata3 = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.get2(), ImmutableList.of(BaseCalciteQueryTest.DUMMY_SQL_ID)).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(), asIdentifiers(segmentsAndCommitMetadata3.getSegments()));
        Assert.assertEquals((Object) 3, segmentsAndCommitMetadata3.getCommitMetadata());
    }

    @Test
    public void testIncrementalHandoff() throws Exception {
        TestCommitterSupplier testCommitterSupplier = new TestCommitterSupplier();
        Assert.assertNull(this.driver.startJob(null));
        testCommitterSupplier.setMetadata(1);
        Assert.assertTrue(this.driver.add(ROWS.get(0), "sequence_0", testCommitterSupplier, false, true).isOk());
        for (int i = 1; i < ROWS.size(); i++) {
            testCommitterSupplier.setMetadata(Integer.valueOf(i + 1));
            Assert.assertTrue(this.driver.add(ROWS.get(i), "sequence_1", testCommitterSupplier, false, true).isOk());
        }
        ListenableFuture<SegmentsAndCommitMetadata> publishAndRegisterHandoff = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.get2(), ImmutableList.of("sequence_0"));
        ListenableFuture<SegmentsAndCommitMetadata> publishAndRegisterHandoff2 = this.driver.publishAndRegisterHandoff(makeOkPublisher(), testCommitterSupplier.get2(), ImmutableList.of("sequence_1"));
        SegmentsAndCommitMetadata segmentsAndCommitMetadata = publishAndRegisterHandoff.get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        SegmentsAndCommitMetadata segmentsAndCommitMetadata2 = publishAndRegisterHandoff2.get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
        Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndCommitMetadata.getSegments()));
        Assert.assertEquals(ImmutableSet.of(new SegmentIdWithShardSpec("foo", Intervals.of("2000T01/PT1H"), VERSION, new NumberedShardSpec(0, 0))), asIdentifiers(segmentsAndCommitMetadata2.getSegments()));
        Assert.assertEquals((Object) 3, segmentsAndCommitMetadata.getCommitMetadata());
        Assert.assertEquals((Object) 3, segmentsAndCommitMetadata2.getCommitMetadata());
    }

    private Set<SegmentIdWithShardSpec> asIdentifiers(Iterable<DataSegment> iterable) {
        return ImmutableSet.copyOf(Iterables.transform(iterable, SegmentIdWithShardSpec::fromDataSegment));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TransactionalSegmentPublisher makeOkPublisher() {
        return (set, set2, obj) -> {
            return SegmentPublishResult.ok(Collections.emptySet());
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TransactionalSegmentPublisher makeFailingPublisher(boolean z) {
        return (set, set2, obj) -> {
            RuntimeException runtimeException = new RuntimeException("test");
            if (z) {
                throw runtimeException;
            }
            return SegmentPublishResult.fail(runtimeException.getMessage());
        };
    }

    static {
        NullHandling.initializeForTests();
    }
}
