package org.apache.druid.firehose.rocketmq;

import com.alibaba.rocketmq.client.Validators;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.PullStatus;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.ServiceThread;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;

/* loaded from: input_file:org/apache/druid/firehose/rocketmq/RocketMQFirehoseFactory.class */
public class RocketMQFirehoseFactory implements FirehoseFactory<InputRowParser<ByteBuffer>> {

    @JsonProperty
    private final Properties consumerProps;

    @JsonProperty(required = true)
    private final String consumerGroup;

    @JsonProperty(required = true)
    private final List<String> feed;

    @JsonProperty
    private final String pullBatchSize;
    private final ConcurrentHashMap<MessageQueue, ConcurrentSkipListSet<MessageExt>> messageQueueTreeSetMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<MessageQueue, ConcurrentSkipListSet<Long>> windows = new ConcurrentHashMap<>();
    private static final int DEFAULT_PULL_BATCH_SIZE = 32;
    private static final Logger LOGGER = new Logger(RocketMQFirehoseFactory.class);
    private static final Comparator<MessageExt> MESSAGE_COMPARATOR = Comparator.comparingLong((v0) -> {
        return v0.getQueueOffset();
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.druid.firehose.rocketmq.RocketMQFirehoseFactory$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/druid/firehose/rocketmq/RocketMQFirehoseFactory$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus = new int[PullStatus.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[PullStatus.FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[PullStatus.NO_NEW_MSG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[PullStatus.NO_MATCHED_MSG.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[PullStatus.OFFSET_ILLEGAL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/druid/firehose/rocketmq/RocketMQFirehoseFactory$DruidMessageQueueListener.class */
    final class DruidMessageQueueListener implements MessageQueueListener {
        private final Set<String> topics;
        private final ConcurrentHashMap<String, Set<MessageQueue>> topicQueueMap;
        private final DefaultMQPullConsumer defaultMQPullConsumer;

        public DruidMessageQueueListener(Set<String> set, ConcurrentHashMap<String, Set<MessageQueue>> concurrentHashMap, DefaultMQPullConsumer defaultMQPullConsumer) {
            this.topics = set;
            this.topicQueueMap = concurrentHashMap;
            this.defaultMQPullConsumer = defaultMQPullConsumer;
        }

        public void messageQueueChanged(String str, Set<MessageQueue> set, Set<MessageQueue> set2) {
            if (this.topics.contains(str)) {
                this.topicQueueMap.put(str, set2);
                Iterator it = RocketMQFirehoseFactory.this.messageQueueTreeSetMap.entrySet().iterator();
                while (it.hasNext()) {
                    if (!set2.contains(((Map.Entry) it.next()).getKey())) {
                        it.remove();
                    }
                }
                StringBuilder sb = new StringBuilder();
                for (MessageQueue messageQueue : set2) {
                    sb.append(messageQueue.getBrokerName()).append("#").append(messageQueue.getQueueId()).append(", ");
                }
                if (!RocketMQFirehoseFactory.LOGGER.isDebugEnabled() || sb.length() <= 2) {
                    return;
                }
                RocketMQFirehoseFactory.LOGGER.debug(StringUtils.format("%s@%s is consuming the following message queues: %s", new Object[]{this.defaultMQPullConsumer.getClientIP(), this.defaultMQPullConsumer.getInstanceName(), sb.substring(0, sb.length() - 2)}), new Object[0]);
            }
        }
    }

    /* loaded from: input_file:org/apache/druid/firehose/rocketmq/RocketMQFirehoseFactory$DruidPullMessageService.class */
    final class DruidPullMessageService extends ServiceThread {
        private volatile List<DruidPullRequest> requestsWrite = new ArrayList();
        private volatile List<DruidPullRequest> requestsRead = new ArrayList();
        private final DefaultMQPullConsumer defaultMQPullConsumer;

        public DruidPullMessageService(DefaultMQPullConsumer defaultMQPullConsumer) {
            this.defaultMQPullConsumer = defaultMQPullConsumer;
        }

        public void putRequest(DruidPullRequest druidPullRequest) {
            synchronized (this) {
                this.requestsWrite.add(druidPullRequest);
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    notify();
                }
            }
        }

        private void swapRequests() {
            List<DruidPullRequest> list = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = list;
        }

        public String getServiceName() {
            return getClass().getSimpleName();
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:12:0x0062. Please report as an issue. */
        private void doPull() {
            for (DruidPullRequest druidPullRequest : this.requestsRead) {
                try {
                    try {
                        PullResult pull = !druidPullRequest.isLongPull() ? this.defaultMQPullConsumer.pull(druidPullRequest.getMessageQueue(), druidPullRequest.getTag(), druidPullRequest.getNextBeginOffset(), druidPullRequest.getPullBatchSize()) : this.defaultMQPullConsumer.pullBlockIfNotFound(druidPullRequest.getMessageQueue(), druidPullRequest.getTag(), druidPullRequest.getNextBeginOffset(), druidPullRequest.getPullBatchSize());
                        switch (AnonymousClass2.$SwitchMap$com$alibaba$rocketmq$client$consumer$PullStatus[pull.getPullStatus().ordinal()]) {
                            case 1:
                                ((ConcurrentSkipListSet) RocketMQFirehoseFactory.this.messageQueueTreeSetMap.computeIfAbsent(druidPullRequest.getMessageQueue(), messageQueue -> {
                                    return new ConcurrentSkipListSet(RocketMQFirehoseFactory.MESSAGE_COMPARATOR);
                                })).addAll(pull.getMsgFoundList());
                                break;
                            case 4:
                                RocketMQFirehoseFactory.LOGGER.error("Bad Pull Request: Offset is illegal. Offset used: %d", new Object[]{Long.valueOf(druidPullRequest.getNextBeginOffset())});
                                break;
                        }
                        druidPullRequest.getCountDownLatch().countDown();
                    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                        RocketMQFirehoseFactory.LOGGER.error(e, "Failed to pull message from broker.", new Object[0]);
                        druidPullRequest.getCountDownLatch().countDown();
                    }
                } catch (Throwable th) {
                    druidPullRequest.getCountDownLatch().countDown();
                    throw th;
                }
            }
            this.requestsRead.clear();
        }

        public void run() {
            RocketMQFirehoseFactory.LOGGER.info(getServiceName() + " starts.", new Object[0]);
            while (!isStoped()) {
                waitForRunning(0L);
                doPull();
            }
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                RocketMQFirehoseFactory.LOGGER.error(e, "", new Object[0]);
            }
            synchronized (this) {
                swapRequests();
            }
            doPull();
            RocketMQFirehoseFactory.LOGGER.info(getServiceName() + " terminated.", new Object[0]);
        }

        protected void onWaitEnd() {
            swapRequests();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/firehose/rocketmq/RocketMQFirehoseFactory$DruidPullRequest.class */
    public static final class DruidPullRequest {
        private final MessageQueue messageQueue;
        private final String tag;
        private final long nextBeginOffset;
        private final int pullBatchSize;
        private final boolean longPull;
        private final CountDownLatch countDownLatch;

        public DruidPullRequest(MessageQueue messageQueue, String str, long j, int i, boolean z) {
            this.messageQueue = messageQueue;
            this.tag = null == str ? "*" : str;
            this.nextBeginOffset = j;
            this.pullBatchSize = i;
            this.longPull = z;
            this.countDownLatch = new CountDownLatch(1);
        }

        public MessageQueue getMessageQueue() {
            return this.messageQueue;
        }

        public long getNextBeginOffset() {
            return this.nextBeginOffset;
        }

        public String getTag() {
            return this.tag;
        }

        public int getPullBatchSize() {
            return this.pullBatchSize;
        }

        public boolean isLongPull() {
            return this.longPull;
        }

        public CountDownLatch getCountDownLatch() {
            return this.countDownLatch;
        }
    }

    @JsonCreator
    public RocketMQFirehoseFactory(@JsonProperty("consumerProps") Properties properties, @JsonProperty("consumerGroup") String str, @JsonProperty("feed") List<String> list, @JsonProperty("pullBatchSize") String str2) {
        this.consumerProps = properties;
        this.pullBatchSize = str2;
        for (Map.Entry entry : this.consumerProps.entrySet()) {
            System.setProperty(entry.getKey().toString(), entry.getValue().toString());
        }
        this.consumerGroup = str;
        this.feed = list;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean hasMessagesPending() {
        Iterator<Map.Entry<MessageQueue, ConcurrentSkipListSet<MessageExt>>> it = this.messageQueueTreeSetMap.entrySet().iterator();
        while (it.hasNext()) {
            if (!it.next().getValue().isEmpty()) {
                return true;
            }
        }
        return false;
    }

    public Firehose connect(InputRowParser<ByteBuffer> inputRowParser, File file) throws IOException, ParseException {
        final InputRowParser withParseSpec = inputRowParser.withParseSpec(inputRowParser.getParseSpec().withDimensionsSpec(inputRowParser.getParseSpec().getDimensionsSpec().withDimensionExclusions(Sets.union(inputRowParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), Sets.newHashSet(new String[]{"feed"})))));
        this.messageQueueTreeSetMap.clear();
        this.windows.clear();
        try {
            final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(this.consumerGroup);
            defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
            final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            final DruidPullMessageService druidPullMessageService = new DruidPullMessageService(defaultMQPullConsumer);
            for (String str : this.feed) {
                Validators.checkTopic(str);
                concurrentHashMap.put(str, defaultMQPullConsumer.fetchSubscribeMessageQueues(str));
            }
            defaultMQPullConsumer.setMessageQueueListener(new DruidMessageQueueListener(Sets.newHashSet(this.feed), concurrentHashMap, defaultMQPullConsumer));
            defaultMQPullConsumer.start();
            druidPullMessageService.start();
            return new Firehose() { // from class: org.apache.druid.firehose.rocketmq.RocketMQFirehoseFactory.1
                private Iterator<InputRow> nextIterator = Collections.emptyIterator();

                public boolean hasMore() {
                    if (this.nextIterator.hasNext()) {
                        return true;
                    }
                    boolean z = false;
                    DruidPullRequest druidPullRequest = null;
                    for (Map.Entry entry : concurrentHashMap.entrySet()) {
                        for (MessageQueue messageQueue : (Set) entry.getValue()) {
                            ConcurrentSkipListSet concurrentSkipListSet = (ConcurrentSkipListSet) RocketMQFirehoseFactory.this.messageQueueTreeSetMap.get(messageQueue);
                            if (concurrentSkipListSet == null || concurrentSkipListSet.isEmpty()) {
                                try {
                                    DruidPullRequest druidPullRequest2 = new DruidPullRequest(messageQueue, null, defaultMQPullConsumer.fetchConsumeOffset(messageQueue, false), (null == RocketMQFirehoseFactory.this.pullBatchSize || RocketMQFirehoseFactory.this.pullBatchSize.isEmpty()) ? RocketMQFirehoseFactory.DEFAULT_PULL_BATCH_SIZE : Integer.parseInt(RocketMQFirehoseFactory.this.pullBatchSize), !RocketMQFirehoseFactory.this.hasMessagesPending());
                                    druidPullMessageService.putRequest(druidPullRequest2);
                                    if (null == druidPullRequest) {
                                        druidPullRequest = druidPullRequest2;
                                    }
                                } catch (MQClientException e) {
                                    RocketMQFirehoseFactory.LOGGER.error("Failed to fetch consume offset for queue: %s", new Object[]{entry.getKey()});
                                }
                            } else {
                                z = true;
                            }
                        }
                    }
                    if (!z && null != druidPullRequest) {
                        try {
                            druidPullRequest.getCountDownLatch().await();
                            z = true;
                        } catch (InterruptedException e2) {
                            RocketMQFirehoseFactory.LOGGER.error(e2, "CountDownLatch await got interrupted", new Object[0]);
                        }
                    }
                    return z;
                }

                @Nullable
                public InputRow nextRow() {
                    if (this.nextIterator.hasNext()) {
                        return this.nextIterator.next();
                    }
                    for (Map.Entry entry : RocketMQFirehoseFactory.this.messageQueueTreeSetMap.entrySet()) {
                        if (!((ConcurrentSkipListSet) entry.getValue()).isEmpty()) {
                            MessageExt messageExt = (MessageExt) ((ConcurrentSkipListSet) entry.getValue()).pollFirst();
                            this.nextIterator = withParseSpec.parseBatch(ByteBuffer.wrap(messageExt.getBody())).iterator();
                            ((ConcurrentSkipListSet) RocketMQFirehoseFactory.this.windows.computeIfAbsent(entry.getKey(), messageQueue -> {
                                return new ConcurrentSkipListSet();
                            })).add(Long.valueOf(messageExt.getQueueOffset()));
                            return this.nextIterator.next();
                        }
                    }
                    throw new RuntimeException("Unexpected Fatal Error! There should have been one row available.");
                }

                public Runnable commit() {
                    return new Runnable() { // from class: org.apache.druid.firehose.rocketmq.RocketMQFirehoseFactory.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            OffsetStore offsetStore = defaultMQPullConsumer.getOffsetStore();
                            HashSet hashSet = new HashSet();
                            for (Map.Entry entry : RocketMQFirehoseFactory.this.windows.entrySet()) {
                                while (!((ConcurrentSkipListSet) entry.getValue()).isEmpty()) {
                                    long readOffset = offsetStore.readOffset((MessageQueue) entry.getKey(), ReadOffsetType.MEMORY_FIRST_THEN_STORE);
                                    if (readOffset + 1 > ((Long) ((ConcurrentSkipListSet) entry.getValue()).first()).longValue()) {
                                        ((ConcurrentSkipListSet) entry.getValue()).pollFirst();
                                    } else if (readOffset + 1 == ((Long) ((ConcurrentSkipListSet) entry.getValue()).first()).longValue()) {
                                        ((ConcurrentSkipListSet) entry.getValue()).pollFirst();
                                        offsetStore.updateOffset((MessageQueue) entry.getKey(), readOffset + 1, true);
                                        hashSet.add(entry.getKey());
                                    }
                                }
                            }
                            offsetStore.persistAll(hashSet);
                        }
                    };
                }

                public void close() {
                    defaultMQPullConsumer.shutdown();
                    druidPullMessageService.shutdown(false);
                }
            };
        } catch (MQClientException e) {
            LOGGER.error(e, "Failed to start DefaultMQPullConsumer", new Object[0]);
            throw new IOException("Failed to start RocketMQ client", e);
        }
    }
}
