package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.class */
public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceReader.class);
    private static final long THREAD_WAIT_TIME = 500;
    private static final long POLL_TIMEOUT = 10000;
    private final SourceReader.Context context;
    private final ConsumerMetadata metadata;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private volatile boolean running = false;
    private final Set<KafkaSourceSplit> sourceSplits = new HashSet();
    private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap = new ConcurrentHashMap();
    private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap = new ConcurrentHashMap();
    private final ExecutorService executorService = Executors.newCachedThreadPool(runnable -> {
        return new Thread(runnable, "Kafka Source Data Consumer");
    });
    private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue = new LinkedBlockingQueue<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSourceReader(ConsumerMetadata consumerMetadata, DeserializationSchema<SeaTunnelRow> deserializationSchema, SourceReader.Context context) {
        this.metadata = consumerMetadata;
        this.context = context;
        this.deserializationSchema = deserializationSchema;
    }

    public void open() {
    }

    public void close() throws IOException {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    public void pollNext(Collector<SeaTunnelRow> collector) throws Exception {
        if (!this.running) {
            Thread.sleep(THREAD_WAIT_TIME);
            return;
        }
        while (this.pendingPartitionsQueue.size() != 0) {
            this.sourceSplits.add(this.pendingPartitionsQueue.poll());
        }
        this.sourceSplits.forEach(kafkaSourceSplit -> {
            this.consumerThreadMap.computeIfAbsent(kafkaSourceSplit.getTopicPartition(), topicPartition -> {
                KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(this.metadata);
                this.executorService.submit(kafkaConsumerThread);
                return kafkaConsumerThread;
            });
        });
        this.sourceSplits.forEach(kafkaSourceSplit2 -> {
            CompletableFuture completableFuture = new CompletableFuture();
            try {
                this.consumerThreadMap.get(kafkaSourceSplit2.getTopicPartition()).getTasks().put(kafkaConsumer -> {
                    try {
                        HashSet newHashSet = Sets.newHashSet(new TopicPartition[]{kafkaSourceSplit2.getTopicPartition()});
                        new StringDeserializer().configure(Maps.fromProperties(this.metadata.getProperties()), false);
                        kafkaConsumer.assign(newHashSet);
                        if (kafkaSourceSplit2.getStartOffset() >= 0) {
                            kafkaConsumer.seek(kafkaSourceSplit2.getTopicPartition(), kafkaSourceSplit2.getStartOffset());
                        }
                        ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(10000L));
                        Iterator it = newHashSet.iterator();
                        while (it.hasNext()) {
                            List<ConsumerRecord> records = poll.records((TopicPartition) it.next());
                            for (ConsumerRecord consumerRecord : records) {
                                this.deserializationSchema.deserialize((byte[]) consumerRecord.value(), collector);
                                if (Boundedness.BOUNDED.equals(this.context.getBoundedness()) && consumerRecord.offset() >= kafkaSourceSplit2.getEndOffset()) {
                                    break;
                                }
                            }
                            long j = -1;
                            if (!records.isEmpty()) {
                                j = ((ConsumerRecord) records.get(records.size() - 1)).offset();
                                kafkaSourceSplit2.setStartOffset(j + 1);
                            }
                            if (j >= kafkaSourceSplit2.getEndOffset()) {
                                kafkaSourceSplit2.setEndOffset(j);
                            }
                        }
                    } catch (Exception e) {
                        completableFuture.completeExceptionally(e);
                    }
                    completableFuture.complete(null);
                });
                completableFuture.join();
            } catch (InterruptedException e) {
                throw new KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e);
            }
        });
        if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
            this.context.signalNoMoreElement();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public List<KafkaSourceSplit> snapshotState(long j) {
        this.checkpointOffsetMap.put(Long.valueOf(j), this.sourceSplits.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTopicPartition();
        }, (v0) -> {
            return v0.getEndOffset();
        })));
        return (List) this.sourceSplits.stream().map((v0) -> {
            return v0.copy();
        }).collect(Collectors.toList());
    }

    public void addSplits(List<KafkaSourceSplit> list) {
        this.running = true;
        list.forEach(kafkaSourceSplit -> {
            try {
                this.pendingPartitionsQueue.put(kafkaSourceSplit);
            } catch (InterruptedException e) {
                throw new KafkaConnectorException(KafkaConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
            }
        });
    }

    public void handleNoMoreSplits() {
        log.info("receive no more splits message, this reader will not add new split.");
    }

    public void notifyCheckpointComplete(long j) {
        if (this.checkpointOffsetMap.containsKey(Long.valueOf(j))) {
            this.checkpointOffsetMap.remove(Long.valueOf(j)).forEach((topicPartition, l) -> {
                try {
                    this.consumerThreadMap.get(topicPartition).getTasks().put(kafkaConsumer -> {
                        if (this.metadata.isCommitOnCheckpoint()) {
                            HashMap hashMap = new HashMap();
                            hashMap.put(topicPartition, new OffsetAndMetadata(l.longValue()));
                            kafkaConsumer.commitSync(hashMap);
                        }
                    });
                } catch (InterruptedException e) {
                    log.error("commit offset to kafka failed", e);
                }
            });
        } else {
            log.warn("checkpoint {} do not exist or have already been committed.", Long.valueOf(j));
        }
    }
}
