package org.apache.kylin.stream.source.kafka;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.shaded.com.google.common.base.Function;
import org.apache.kylin.shaded.com.google.common.collect.FluentIterable;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.MapDifference;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.stream.core.consumer.ConsumerStartMode;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.consumer.IStreamingConnector;
import org.apache.kylin.stream.core.exception.StreamingException;
import org.apache.kylin.stream.core.source.ISourcePosition;
import org.apache.kylin.stream.core.source.ISourcePositionHandler;
import org.apache.kylin.stream.core.source.IStreamingMessageParser;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.MessageParserInfo;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceConfig;
import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.source.StreamingTableSourceInfo;
import org.apache.kylin.stream.core.storage.StreamingSegmentManager;
import org.apache.kylin.stream.source.kafka.KafkaPosition;
import org.apache.kylin.stream.source.kafka.consumer.KafkaConnector;
import org.apache.log4j.Priority;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/stream/source/kafka/KafkaSource.class */
public class KafkaSource implements IStreamingSource {
    public static final String PROP_TOPIC = "topic";
    public static final String PROP_BOOTSTRAP_SERVERS = "bootstrap.servers";
    public static final String PROP_MESSAGE_PARSER = "message.parser";
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaSource.class);
    private static final String DEF_MSSAGE_PARSER_CLAZZ = "org.apache.kylin.stream.source.kafka.TimedJsonStreamParser";

    @Override // org.apache.kylin.stream.core.source.IStreamingSource
    public StreamingTableSourceInfo load(String str) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        CubeInstance cube = CubeManager.getInstance(instanceFromEnv).getCube(str);
        StreamingSourceConfig config = StreamingSourceConfigManager.getInstance(instanceFromEnv).getConfig(cube.getRootFactTable());
        String topicName = getTopicName(config.getProperties());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(getKafkaConf(config.getProperties(), cube.getConfig()));
        try {
            StreamingTableSourceInfo streamingTableSourceInfo = new StreamingTableSourceInfo(Lists.transform(kafkaConsumer.partitionsFor(topicName), new Function<PartitionInfo, Partition>() { // from class: org.apache.kylin.stream.source.kafka.KafkaSource.1
                @Override // org.apache.kylin.shaded.com.google.common.base.Function, java.util.function.Function
                @Nullable
                public Partition apply(@Nullable PartitionInfo partitionInfo) {
                    return new Partition(partitionInfo.partition());
                }
            }));
            kafkaConsumer.close();
            return streamingTableSourceInfo;
        } catch (Throwable th) {
            kafkaConsumer.close();
            throw th;
        }
    }

    @Override // org.apache.kylin.stream.core.source.IStreamingSource
    public String getMessageTemplate(StreamingSourceConfig streamingSourceConfig) {
        ConsumerRecords poll;
        String str = null;
        KafkaConsumer kafkaConsumer = null;
        try {
            try {
                String topicName = getTopicName(streamingSourceConfig.getProperties());
                kafkaConsumer = new KafkaConsumer(getKafkaConf(streamingSourceConfig.getProperties()));
                HashSet newHashSet = Sets.newHashSet(FluentIterable.from(kafkaConsumer.partitionsFor(topicName)).transform(new Function<PartitionInfo, TopicPartition>() { // from class: org.apache.kylin.stream.source.kafka.KafkaSource.2
                    @Override // org.apache.kylin.shaded.com.google.common.base.Function, java.util.function.Function
                    public TopicPartition apply(PartitionInfo partitionInfo) {
                        return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    }
                }));
                kafkaConsumer.assign(newHashSet);
                kafkaConsumer.seekToBeginning(newHashSet);
                poll = kafkaConsumer.poll(500L);
            } catch (Exception e) {
                logger.error("error when fetch one record from kafka, stream:" + streamingSourceConfig.getName(), (Throwable) e);
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
            }
            if (poll == null) {
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
                return null;
            }
            Iterator it = poll.iterator();
            if (it == null || !it.hasNext()) {
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
                return null;
            }
            str = new String((byte[]) ((ConsumerRecord) it.next()).value(), "UTF8");
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            return str;
        } catch (Throwable th) {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw th;
        }
    }

    @Override // org.apache.kylin.stream.core.source.IStreamingSource
    public IStreamingConnector createStreamingConnector(String str, List<Partition> list, ConsumerStartProtocol consumerStartProtocol, StreamingSegmentManager streamingSegmentManager) {
        logger.info("Create StreamingConnector for Cube {}, assignedPartitions {}, startProtocol {}", str, list, consumerStartProtocol);
        try {
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            CubeInstance cube = CubeManager.getInstance(instanceFromEnv).getCube(str);
            IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource(cube);
            StreamingSourceConfig config = StreamingSourceConfigManager.getInstance(instanceFromEnv).getConfig(cube.getRootFactTable());
            KafkaConnector kafkaConnector = new KafkaConnector(getKafkaConf(config.getProperties(), cube.getConfig()), getTopicName(config.getProperties()), (IStreamingMessageParser) getStreamingMessageParserClass(config.getProperties()).getConstructor(CubeDesc.class, MessageParserInfo.class).newInstance(cube.getDescriptor(), config.getParserInfo()), this);
            if (consumerStartProtocol != null) {
                if (consumerStartProtocol.getStartPosition() == null || consumerStartProtocol.getStartPosition().length() <= 0) {
                    kafkaConnector.setStartPartition(list, consumerStartProtocol.getStartMode(), null);
                } else {
                    KafkaPosition kafkaPosition = (KafkaPosition) streamingSource.getSourcePositionHandler().parsePosition(consumerStartProtocol.getStartPosition());
                    kafkaConnector.setStartPartition(list, consumerStartProtocol.getStartMode(), kafkaPosition.getPartitionOffsets());
                    streamingSegmentManager.restoreConsumerStates(kafkaPosition);
                }
                streamingSegmentManager.checkpoint();
            } else if (streamingSegmentManager != null) {
                setupConnectorFromCheckpoint(kafkaConnector, list, streamingSource, streamingSegmentManager);
            }
            return kafkaConnector;
        } catch (Exception e) {
            throw new StreamingException("streaming connector create fail, cube:" + str, e);
        }
    }

    @Override // org.apache.kylin.stream.core.source.IStreamingSource
    public ISourcePositionHandler getSourcePositionHandler() {
        return new KafkaPositionHandler();
    }

    private void setupConnectorFromCheckpoint(KafkaConnector kafkaConnector, List<Partition> list, IStreamingSource iStreamingSource, StreamingSegmentManager streamingSegmentManager) {
        KafkaPosition kafkaPosition;
        CubeInstance cubeInstance = streamingSegmentManager.getCubeInstance();
        CubeSegment latestReadySegment = cubeInstance.getLatestReadySegment();
        String checkPointSourcePosition = streamingSegmentManager.getCheckPointSourcePosition();
        String str = null;
        if (latestReadySegment != null) {
            str = latestReadySegment.getStreamSourceCheckpoint();
        }
        logger.info("localConsumeStats from local checkpoint {}, remoteConsumeStats from remote checkpoint {} ", checkPointSourcePosition, str);
        KafkaPosition kafkaPosition2 = null;
        KafkaPosition kafkaPosition3 = null;
        if (checkPointSourcePosition != null) {
            kafkaPosition2 = (KafkaPosition) iStreamingSource.getSourcePositionHandler().parsePosition(checkPointSourcePosition);
        }
        if (str != null) {
            kafkaPosition3 = (KafkaPosition) iStreamingSource.getSourcePositionHandler().parsePosition(str);
        }
        if (isEmptyPosition(kafkaPosition2) && isEmptyPosition(kafkaPosition3)) {
            if (cubeInstance.getSegments().isEmpty() && cubeInstance.getConfig().isStreamingConsumeFromLatestOffsets()) {
                logger.info("start kafka connector from latest");
                kafkaConnector.setStartPartition(list, ConsumerStartMode.LATEST, null);
                return;
            } else {
                logger.info("start kafka connector from earliest");
                kafkaConnector.setStartPartition(list, ConsumerStartMode.EARLIEST, null);
                return;
            }
        }
        if (isEmptyPosition(kafkaPosition2) && !isEmptyPosition(kafkaPosition3)) {
            kafkaPosition = kafkaPosition3;
        } else if (!isEmptyPosition(kafkaPosition3) || isEmptyPosition(kafkaPosition2)) {
            HashMap newHashMap = Maps.newHashMap();
            MapDifference difference = Maps.difference(kafkaPosition2.getPartitionOffsets(), kafkaPosition3.getPartitionOffsets());
            newHashMap.putAll(difference.entriesInCommon());
            newHashMap.putAll(difference.entriesOnlyOnLeft());
            newHashMap.putAll(difference.entriesOnlyOnRight());
            newHashMap.putAll(Maps.transformValues(difference.entriesDiffering(), new Function<MapDifference.ValueDifference<Long>, Long>() { // from class: org.apache.kylin.stream.source.kafka.KafkaSource.3
                @Override // org.apache.kylin.shaded.com.google.common.base.Function, java.util.function.Function
                @Nullable
                public Long apply(@Nullable MapDifference.ValueDifference<Long> valueDifference) {
                    return valueDifference.leftValue().longValue() > valueDifference.rightValue().longValue() ? valueDifference.leftValue() : valueDifference.rightValue();
                }
            }));
            kafkaPosition = new KafkaPosition(newHashMap);
        } else {
            kafkaPosition = (KafkaPosition) kafkaPosition2.advance();
        }
        logger.info("start kafka connector from specified position:{}", kafkaPosition);
        kafkaConnector.setStartPartition(list, ConsumerStartMode.SPECIFIC_POSITION, kafkaPosition.getPartitionOffsets());
    }

    private boolean isEmptyPosition(KafkaPosition kafkaPosition) {
        return kafkaPosition == null || kafkaPosition.getPartitionOffsets().isEmpty();
    }

    public static Map<String, Object> getKafkaConf(Map<String, String> map, KylinConfig kylinConfig) {
        Map<String, String> kafkaConfigOverride = kylinConfig.getKafkaConfigOverride();
        Map<String, Object> kafkaConf = getKafkaConf(map);
        kafkaConf.putAll(kafkaConfigOverride);
        return kafkaConf;
    }

    public static Map<String, Object> getKafkaConf(Map<String, String> map) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(PROP_BOOTSTRAP_SERVERS, getBootstrapServers(map));
        newHashMap.put("enable.auto.commit", "false");
        newHashMap.put("session.timeout.ms", String.valueOf(Priority.INFO_INT));
        newHashMap.put("request.timeout.ms", String.valueOf(30000));
        newHashMap.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        newHashMap.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        return newHashMap;
    }

    public static String getBootstrapServers(Map<String, String> map) {
        return map.get(PROP_BOOTSTRAP_SERVERS);
    }

    public static String getTopicName(Map<String, String> map) {
        return map.get(PROP_TOPIC);
    }

    public static Class<?> getStreamingMessageParserClass(Map<String, String> map) throws ClassNotFoundException {
        String str = map.get(PROP_MESSAGE_PARSER);
        String str2 = DEF_MSSAGE_PARSER_CLAZZ;
        if (str != null) {
            str2 = getParserClassName(str);
        }
        return Class.forName(str2);
    }

    public static String getParserClassName(String str) {
        return str;
    }

    private ISourcePosition getLatestPosition(String str) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        CubeInstance cube = CubeManager.getInstance(instanceFromEnv).getCube(str);
        StreamingSourceConfig config = StreamingSourceConfigManager.getInstance(instanceFromEnv).getConfig(cube.getRootFactTable());
        String topicName = getTopicName(config.getProperties());
        KafkaPosition kafkaPosition = new KafkaPosition();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(getKafkaConf(config.getProperties(), cube.getConfig()));
        Throwable th = null;
        try {
            for (Map.Entry entry : kafkaConsumer.endOffsets(Sets.newHashSet(FluentIterable.from(kafkaConsumer.partitionsFor(topicName)).transform(new Function<PartitionInfo, TopicPartition>() { // from class: org.apache.kylin.stream.source.kafka.KafkaSource.4
                @Override // org.apache.kylin.shaded.com.google.common.base.Function, java.util.function.Function
                public TopicPartition apply(PartitionInfo partitionInfo) {
                    return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                }
            }))).entrySet()) {
                kafkaPosition.update(new KafkaPosition.KafkaPartitionPosition(((TopicPartition) entry.getKey()).partition(), ((Long) entry.getValue()).longValue()));
            }
            return kafkaPosition;
        } finally {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
        }
    }

    @Override // org.apache.kylin.stream.core.source.IStreamingSource
    public Map<Integer, Long> calConsumeLag(String str, ISourcePosition iSourcePosition) {
        ISourcePosition latestPosition = getLatestPosition(str);
        HashMap newHashMap = Maps.newHashMap();
        for (ISourcePosition.IPartitionPosition iPartitionPosition : iSourcePosition.getPartitionPositions().values()) {
            long j = ((KafkaPosition.KafkaPartitionPosition) iPartitionPosition).offset;
            ISourcePosition.IPartitionPosition iPartitionPosition2 = latestPosition.getPartitionPositions().get(Integer.valueOf(iPartitionPosition.getPartition()));
            if (iPartitionPosition2 != null) {
                newHashMap.put(Integer.valueOf(iPartitionPosition.getPartition()), Long.valueOf(((KafkaPosition.KafkaPartitionPosition) iPartitionPosition2).offset - j));
            }
        }
        return newHashMap;
    }
}
