package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
import org.apache.seatunnel.shade.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.class */
public class RocketMqSourceReader implements SourceReader<SeaTunnelRow, RocketMqSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(RocketMqSourceReader.class);
    private static final long THREAD_WAIT_TIME = 500;
    private final SourceReader.Context context;
    private final ConsumerMetadata metadata;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private volatile boolean running = false;
    private final Set<RocketMqSourceSplit> sourceSplits = new HashSet();
    private final Map<MessageQueue, RocketMqConsumerThread> consumerThreads = new ConcurrentHashMap();
    private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets = new ConcurrentHashMap();
    private final ExecutorService executorService = Executors.newCachedThreadPool(runnable -> {
        return new Thread(runnable, "RocketMq Source Data Consumer");
    });
    private final LinkedBlockingQueue<RocketMqSourceSplit> pendingPartitionsQueue = new LinkedBlockingQueue<>();

    public RocketMqSourceReader(ConsumerMetadata consumerMetadata, DeserializationSchema<SeaTunnelRow> deserializationSchema, SourceReader.Context context) {
        this.metadata = consumerMetadata;
        this.context = context;
        this.deserializationSchema = deserializationSchema;
        System.setProperty(ClientLogger.CLIENT_LOG_USESLF4J, "true");
    }

    public void open() throws Exception {
    }

    public void close() throws IOException {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        if (!this.running) {
            Thread.sleep(THREAD_WAIT_TIME);
            return;
        }
        while (this.pendingPartitionsQueue.size() != 0) {
            this.sourceSplits.add(this.pendingPartitionsQueue.poll());
        }
        this.sourceSplits.forEach(rocketMqSourceSplit -> {
            this.consumerThreads.computeIfAbsent(rocketMqSourceSplit.getMessageQueue(), messageQueue -> {
                RocketMqConsumerThread rocketMqConsumerThread = new RocketMqConsumerThread(this.metadata);
                this.executorService.submit(rocketMqConsumerThread);
                return rocketMqConsumerThread;
            });
        });
        this.sourceSplits.forEach(rocketMqSourceSplit2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            try {
                this.consumerThreads.get(rocketMqSourceSplit2.getMessageQueue()).getTasks().put(defaultLitePullConsumer -> {
                    try {
                        HashSet<MessageQueue> newHashSet = Sets.newHashSet(new MessageQueue[]{rocketMqSourceSplit2.getMessageQueue()});
                        defaultLitePullConsumer.assign(newHashSet);
                        if (rocketMqSourceSplit2.getStartOffset() >= 0) {
                            defaultLitePullConsumer.seek(rocketMqSourceSplit2.getMessageQueue(), rocketMqSourceSplit2.getStartOffset());
                        }
                        List<MessageExt> poll = defaultLitePullConsumer.poll(this.metadata.getBaseConfig().getPollTimeoutMillis().longValue());
                        if (poll.isEmpty()) {
                            log.warn("Rocketmq consumer can not pull data, split {}, start offset {}, end offset {}", new Object[]{rocketMqSourceSplit2.getMessageQueue(), Long.valueOf(rocketMqSourceSplit2.getStartOffset()), Long.valueOf(rocketMqSourceSplit2.getEndOffset())});
                        }
                        Map map = (Map) poll.stream().collect(Collectors.groupingBy(messageExt -> {
                            return new MessageQueue(messageExt.getTopic(), messageExt.getBrokerName(), messageExt.getQueueId());
                        }));
                        for (MessageQueue messageQueue : newHashSet) {
                            if (map.containsKey(messageQueue)) {
                                List<MessageExt> list = (List) map.get(messageQueue);
                                for (MessageExt messageExt2 : list) {
                                    this.deserializationSchema.deserialize(messageExt2.getBody(), collector);
                                    if (Boundedness.BOUNDED.equals(this.context.getBoundedness()) && messageExt2.getQueueOffset() >= rocketMqSourceSplit2.getEndOffset()) {
                                        break;
                                    }
                                }
                                long j = -1;
                                if (!list.isEmpty()) {
                                    j = ((MessageExt) list.get(list.size() - 1)).getQueueOffset();
                                    rocketMqSourceSplit2.setStartOffset(j);
                                }
                                if (j >= rocketMqSourceSplit2.getEndOffset()) {
                                    rocketMqSourceSplit2.setEndOffset(j);
                                }
                            }
                        }
                    } catch (Exception e) {
                        completableFuture.completeExceptionally(e);
                    }
                    completableFuture.complete(null);
                });
                completableFuture.join();
            } catch (InterruptedException e) {
                throw new RocketMqConnectorException(RocketMqConnectorErrorCode.CONSUME_DATA_FAILED, e);
            }
        });
        if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
            this.context.signalNoMoreElement();
        }
    }

    public List<RocketMqSourceSplit> snapshotState(long j) throws Exception {
        List<RocketMqSourceSplit> list = (List) this.sourceSplits.stream().map((v0) -> {
            return v0.copy();
        }).collect(Collectors.toList());
        Map<MessageQueue, Long> computeIfAbsent = this.checkpointOffsets.computeIfAbsent(Long.valueOf(j), l -> {
            return Maps.newConcurrentMap();
        });
        for (RocketMqSourceSplit rocketMqSourceSplit : list) {
            computeIfAbsent.put(rocketMqSourceSplit.getMessageQueue(), Long.valueOf(rocketMqSourceSplit.getStartOffset()));
        }
        return list;
    }

    public void addSplits(List<RocketMqSourceSplit> list) {
        this.running = true;
        list.forEach(rocketMqSourceSplit -> {
            try {
                this.pendingPartitionsQueue.put(rocketMqSourceSplit);
            } catch (InterruptedException e) {
                throw new RocketMqConnectorException(RocketMqConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
            }
        });
    }

    public void handleNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (!this.checkpointOffsets.containsKey(Long.valueOf(j))) {
            log.warn("checkpoint {} do not exist or have already been committed.", Long.valueOf(j));
            return;
        }
        for (Map.Entry<MessageQueue, Long> entry : this.checkpointOffsets.remove(Long.valueOf(j)).entrySet()) {
            MessageQueue key = entry.getKey();
            Long value = entry.getValue();
            if (key != null && value != null) {
                try {
                    RocketMqConsumerThread rocketMqConsumerThread = this.consumerThreads.get(key);
                    if (rocketMqConsumerThread != null) {
                        rocketMqConsumerThread.getTasks().put(defaultLitePullConsumer -> {
                            if (this.metadata.isEnabledCommitCheckpoint()) {
                                defaultLitePullConsumer.getOffsetStore().updateOffset(key, value.longValue(), false);
                                defaultLitePullConsumer.getOffsetStore().persist(key);
                            }
                        });
                    }
                } catch (InterruptedException e) {
                    log.error("commit offset failed", e);
                }
            }
        }
    }
}
