package org.apache.hive.druid.org.apache.druid.server.coordination.coordination;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.hive.druid.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Joiner;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.hive.druid.org.apache.druid.curator.announcement.Announcer;
import org.apache.hive.druid.org.apache.druid.java.util.common.DateTimes;
import org.apache.hive.druid.org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.hive.druid.org.apache.druid.segment.TestHelper;
import org.apache.hive.druid.org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.hive.druid.org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.hive.druid.org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.hive.druid.org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.org.apache.druid.server.coordination.ServerType;
import org.apache.hive.druid.org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.hive.druid.org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.hive.druid.org.apache.druid.timeline.DataSegment;
import org.apache.tools.ant.types.selectors.TypeSelector;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hive/druid/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.class */
public class BatchDataSegmentAnnouncerTest {
    private static final String TEST_BASE_PATH = "/test";
    private static final String TEST_SEGMENTS_PATH = "/test/segments/id";
    private static final Joiner JOINER = Joiner.on("/");
    private static final int NUM_THREADS = 4;
    private TestingCluster testingCluster;
    private CuratorFramework cf;
    private ObjectMapper jsonMapper;
    private TestAnnouncer announcer;
    private SegmentReader segmentReader;
    private BatchDataSegmentAnnouncer segmentAnnouncer;
    private Set<DataSegment> testSegments;
    private final AtomicInteger maxBytesPerNode = new AtomicInteger(524288);
    private Boolean skipDimensionsAndMetrics;
    private Boolean skipLoadSpec;
    private ExecutorService exec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hive/druid/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest$SegmentReader.class */
    public static class SegmentReader {
        private final CuratorFramework cf;
        private final ObjectMapper jsonMapper;

        public SegmentReader(CuratorFramework curatorFramework, ObjectMapper objectMapper) {
            this.cf = curatorFramework;
            this.jsonMapper = objectMapper;
        }

        public Set<DataSegment> read(String str) {
            try {
                return this.cf.checkExists().forPath(str) != null ? (Set) this.jsonMapper.readValue((byte[]) this.cf.getData().forPath(str), new TypeReference<Set<DataSegment>>() { // from class: org.apache.hive.druid.org.apache.druid.server.coordination.coordination.BatchDataSegmentAnnouncerTest.SegmentReader.1
                }) : new HashSet();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/hive/druid/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest$TestAnnouncer.class */
    private static class TestAnnouncer extends Announcer {
        private final ConcurrentHashMap<String, ConcurrentHashMap<byte[], AtomicInteger>> numPathAnnounced;

        private TestAnnouncer(CuratorFramework curatorFramework, ExecutorService executorService) {
            super(curatorFramework, executorService);
            this.numPathAnnounced = new ConcurrentHashMap<>();
        }

        public void announce(String str, byte[] bArr, boolean z) {
            this.numPathAnnounced.computeIfAbsent(str, str2 -> {
                return new ConcurrentHashMap();
            }).computeIfAbsent(bArr, bArr2 -> {
                return new AtomicInteger(0);
            }).incrementAndGet();
            super.announce(str, bArr, z);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.testingCluster = new TestingCluster(1);
        this.testingCluster.start();
        this.cf = CuratorFrameworkFactory.builder().connectString(this.testingCluster.getConnectString()).retryPolicy(new ExponentialBackoffRetry(1, 10)).compressionProvider(new PotentiallyGzippedCompressionProvider(false)).build();
        this.cf.start();
        this.cf.blockUntilConnected();
        this.cf.create().creatingParentsIfNeeded().forPath(TEST_BASE_PATH);
        this.jsonMapper = TestHelper.makeJsonMapper();
        this.announcer = new TestAnnouncer(this.cf, Execs.directExecutor());
        this.announcer.start();
        this.segmentReader = new SegmentReader(this.cf, this.jsonMapper);
        this.skipDimensionsAndMetrics = false;
        this.skipLoadSpec = false;
        this.segmentAnnouncer = new BatchDataSegmentAnnouncer(new DruidServerMetadata("id", "host", (String) null, Long.MAX_VALUE, ServerType.HISTORICAL, "tier", 0), new BatchDataSegmentAnnouncerConfig() { // from class: org.apache.hive.druid.org.apache.druid.server.coordination.coordination.BatchDataSegmentAnnouncerTest.1
            public int getSegmentsPerNode() {
                return 50;
            }

            public long getMaxBytesPerNode() {
                return BatchDataSegmentAnnouncerTest.this.maxBytesPerNode.get();
            }

            public boolean isSkipDimensionsAndMetrics() {
                return BatchDataSegmentAnnouncerTest.this.skipDimensionsAndMetrics.booleanValue();
            }

            public boolean isSkipLoadSpec() {
                return BatchDataSegmentAnnouncerTest.this.skipLoadSpec.booleanValue();
            }
        }, new ZkPathsConfig() { // from class: org.apache.hive.druid.org.apache.druid.server.coordination.coordination.BatchDataSegmentAnnouncerTest.2
            public String getBase() {
                return BatchDataSegmentAnnouncerTest.TEST_BASE_PATH;
            }
        }, this.announcer, this.jsonMapper);
        this.testSegments = new HashSet();
        for (int i = 0; i < 100; i++) {
            this.testSegments.add(makeSegment(i));
        }
        this.exec = Execs.multiThreaded(4, "BatchDataSegmentAnnouncerTest-%d");
    }

    @After
    public void tearDown() throws Exception {
        this.announcer.stop();
        this.cf.close();
        this.testingCluster.stop();
        this.exec.shutdownNow();
    }

    @Test
    public void testSingleAnnounce() throws Exception {
        Iterator<DataSegment> it = this.testSegments.iterator();
        DataSegment next = it.next();
        DataSegment next2 = it.next();
        this.segmentAnnouncer.announceSegment(next);
        List list = (List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
        Iterator it2 = list.iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(this.segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, (String) it2.next(), new Object[0])).iterator().next(), next);
        }
        this.segmentAnnouncer.announceSegment(next2);
        Iterator it3 = list.iterator();
        while (it3.hasNext()) {
            Assert.assertEquals(Sets.newHashSet(new DataSegment[]{next, next2}), this.segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, (String) it3.next(), new Object[0])));
        }
        ChangeRequestsSnapshot changeRequestsSnapshot = (ChangeRequestsSnapshot) this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals(2L, changeRequestsSnapshot.getRequests().size());
        Assert.assertEquals(2L, changeRequestsSnapshot.getCounter().getCounter());
        this.segmentAnnouncer.unannounceSegment(next);
        Iterator it4 = list.iterator();
        while (it4.hasNext()) {
            Assert.assertEquals(this.segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, (String) it4.next(), new Object[0])).iterator().next(), next2);
        }
        this.segmentAnnouncer.unannounceSegment(next2);
        Assert.assertTrue(((List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
        ChangeRequestsSnapshot changeRequestsSnapshot2 = (ChangeRequestsSnapshot) this.segmentAnnouncer.getSegmentChangesSince(changeRequestsSnapshot.getCounter()).get();
        Assert.assertEquals(2L, changeRequestsSnapshot2.getRequests().size());
        Assert.assertEquals(4L, changeRequestsSnapshot2.getCounter().getCounter());
        ChangeRequestsSnapshot changeRequestsSnapshot3 = (ChangeRequestsSnapshot) this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals(0L, changeRequestsSnapshot3.getRequests().size());
        Assert.assertEquals(4L, changeRequestsSnapshot3.getCounter().getCounter());
    }

    @Test
    public void testSkipDimensions() throws Exception {
        this.skipDimensionsAndMetrics = true;
        DataSegment next = this.testSegments.iterator().next();
        this.segmentAnnouncer.announceSegment(next);
        Iterator it = ((List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).iterator();
        while (it.hasNext()) {
            DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(this.segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, (String) it.next(), new Object[0])));
            Assert.assertEquals(dataSegment, next);
            Assert.assertTrue(dataSegment.getDimensions().isEmpty());
            Assert.assertTrue(dataSegment.getMetrics().isEmpty());
        }
        this.segmentAnnouncer.unannounceSegment(next);
        Assert.assertTrue(((List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
    }

    @Test
    public void testSkipLoadSpec() throws Exception {
        this.skipLoadSpec = true;
        DataSegment next = this.testSegments.iterator().next();
        this.segmentAnnouncer.announceSegment(next);
        Iterator it = ((List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).iterator();
        while (it.hasNext()) {
            DataSegment dataSegment = (DataSegment) Iterables.getOnlyElement(this.segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, (String) it.next(), new Object[0])));
            Assert.assertEquals(dataSegment, next);
            Assert.assertNull(dataSegment.getLoadSpec());
        }
        this.segmentAnnouncer.unannounceSegment(next);
        Assert.assertTrue(((List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
    }

    @Test
    public void testSingleAnnounceManyTimes() throws Exception {
        int i = this.maxBytesPerNode.get();
        this.maxBytesPerNode.set(2048);
        try {
            Iterator<DataSegment> it = this.testSegments.iterator();
            while (it.hasNext()) {
                this.segmentAnnouncer.announceSegment(it.next());
            }
            List list = (List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
            Assert.assertEquals(20L, list.size());
            HashSet newHashSet = Sets.newHashSet(this.testSegments);
            Iterator it2 = list.iterator();
            while (it2.hasNext()) {
                for (DataSegment dataSegment : this.segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, (String) it2.next(), new Object[0]))) {
                    Assert.assertTrue("Invalid segment " + dataSegment, newHashSet.remove(dataSegment));
                }
            }
            Assert.assertTrue("Failed to find segments " + newHashSet, newHashSet.isEmpty());
        } finally {
            this.maxBytesPerNode.set(i);
        }
    }

    @Test
    public void testBatchAnnounce() throws Exception {
        testBatchAnnounce(true);
    }

    @Test
    public void testMultipleBatchAnnounce() throws Exception {
        for (int i = 0; i < 10; i++) {
            testBatchAnnounce(false);
        }
    }

    private void testBatchAnnounce(boolean z) throws Exception {
        this.segmentAnnouncer.announceSegments(this.testSegments);
        List list = (List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH);
        Assert.assertEquals(2L, list.size());
        HashSet hashSet = new HashSet();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            hashSet.addAll(this.segmentReader.read(JOINER.join(TEST_SEGMENTS_PATH, (String) it.next(), new Object[0])));
        }
        Assert.assertEquals(hashSet, this.testSegments);
        ChangeRequestsSnapshot changeRequestsSnapshot = null;
        if (z) {
            changeRequestsSnapshot = (ChangeRequestsSnapshot) this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
            Assert.assertEquals(this.testSegments.size(), changeRequestsSnapshot.getRequests().size());
            Assert.assertEquals(this.testSegments.size(), changeRequestsSnapshot.getCounter().getCounter());
        }
        this.segmentAnnouncer.unannounceSegments(this.testSegments);
        Assert.assertTrue(((List) this.cf.getChildren().forPath(TEST_SEGMENTS_PATH)).isEmpty());
        if (z) {
            ChangeRequestsSnapshot changeRequestsSnapshot2 = (ChangeRequestsSnapshot) this.segmentAnnouncer.getSegmentChangesSince(changeRequestsSnapshot.getCounter()).get();
            Assert.assertEquals(this.testSegments.size(), changeRequestsSnapshot2.getRequests().size());
            Assert.assertEquals(2 * this.testSegments.size(), changeRequestsSnapshot2.getCounter().getCounter());
            ChangeRequestsSnapshot changeRequestsSnapshot3 = (ChangeRequestsSnapshot) this.segmentAnnouncer.getSegmentChangesSince(new ChangeRequestHistory.Counter(-1L, -1L)).get();
            Assert.assertEquals(0L, changeRequestsSnapshot3.getRequests().size());
            Assert.assertEquals(2 * this.testSegments.size(), changeRequestsSnapshot3.getCounter().getCounter());
        }
    }

    @Test(timeout = 5000)
    public void testAnnounceSegmentsWithSameSegmentConcurrently() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList(4);
        for (int i = 0; i < 4; i++) {
            arrayList.add(this.exec.submit(() -> {
                try {
                    this.segmentAnnouncer.announceSegments(this.testSegments);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Assert.assertEquals(2L, this.announcer.numPathAnnounced.size());
        Iterator it2 = this.announcer.numPathAnnounced.values().iterator();
        while (it2.hasNext()) {
            Iterator it3 = ((ConcurrentHashMap) it2.next()).entrySet().iterator();
            while (it3.hasNext()) {
                Assert.assertEquals(1L, ((AtomicInteger) ((Map.Entry) it3.next()).getValue()).get());
            }
        }
    }

    @Test(timeout = 5000)
    public void testAnnounceSegmentWithSameSegmentConcurrently() throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList(4);
        DataSegment makeSegment = makeSegment(0);
        DataSegment makeSegment2 = makeSegment(1);
        DataSegment makeSegment3 = makeSegment(2);
        DataSegment makeSegment4 = makeSegment(3);
        for (int i = 0; i < 4; i++) {
            arrayList.add(this.exec.submit(() -> {
                try {
                    this.segmentAnnouncer.announceSegment(makeSegment);
                    this.segmentAnnouncer.announceSegment(makeSegment2);
                    this.segmentAnnouncer.announceSegment(makeSegment3);
                    this.segmentAnnouncer.announceSegment(makeSegment4);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        Assert.assertEquals(1L, this.announcer.numPathAnnounced.size());
        Iterator it2 = this.announcer.numPathAnnounced.values().iterator();
        while (it2.hasNext()) {
            Iterator it3 = ((ConcurrentHashMap) it2.next()).entrySet().iterator();
            while (it3.hasNext()) {
                Assert.assertEquals(1L, ((AtomicInteger) ((Map.Entry) it3.next()).getValue()).get());
            }
        }
    }

    private DataSegment makeSegment(int i) {
        return DataSegment.builder().dataSource("foo").interval(new Interval(DateTimes.of("2013-01-01").plusDays(i), DateTimes.of("2013-01-02").plusDays(i))).version(DateTimes.nowUtc().toString()).dimensions(ImmutableList.of("dim1", "dim2")).metrics(ImmutableList.of("met1", "met2")).loadSpec(ImmutableMap.of(TypeSelector.TYPE_KEY, "local")).size(0L).build();
    }
}
