package org.apache.kylin.rest.service;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.loader.ParserClassLoaderState;
import org.apache.kylin.metadata.jar.JarInfoManager;
import org.apache.kylin.metadata.jar.JarTypeEnum;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.streaming.DataParserInfo;
import org.apache.kylin.metadata.streaming.DataParserManager;
import org.apache.kylin.metadata.streaming.KafkaConfig;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.sample.KafkaSourceHandler;
import org.apache.kylin.sample.StreamingSourceHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component("kafkaService")
/* loaded from: input_file:org/apache/kylin/rest/service/KafkaService.class */
public class KafkaService extends BasicService {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaService.class);

    @Autowired
    private AclEvaluate aclEvaluate;
    private final StreamingSourceHandler sourceHandler = new KafkaSourceHandler();

    public Map<String, List<String>> getTopics(KafkaConfig kafkaConfig, String str, String str2) {
        if (StringUtils.isEmpty(kafkaConfig.getKafkaBootstrapServers())) {
            throw new KylinException(ServerErrorCode.INVALID_BROKER_DEFINITION, MsgPicker.getMsg().getInvalidBrokerDefinition());
        }
        this.aclEvaluate.checkProjectWritePermission(str);
        checkBrokerStatus(kafkaConfig);
        return this.sourceHandler.getTopics(kafkaConfig, str2);
    }

    public void checkBrokerStatus(KafkaConfig kafkaConfig) {
        List brokenBrokers = this.sourceHandler.getBrokenBrokers(kafkaConfig);
        if (CollectionUtils.isEmpty(brokenBrokers)) {
            return;
        }
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("failed_servers", brokenBrokers);
        throw new KylinException(ServerErrorCode.BROKER_TIMEOUT_MESSAGE, MsgPicker.getMsg().getBrokerTimeoutMessage()).withData(newHashMap);
    }

    public List<ByteBuffer> getMessages(KafkaConfig kafkaConfig, String str) {
        this.aclEvaluate.checkProjectWritePermission(str);
        if (StringUtils.isEmpty(kafkaConfig.getKafkaBootstrapServers())) {
            throw new KylinException(ServerErrorCode.INVALID_BROKER_DEFINITION, MsgPicker.getMsg().getInvalidBrokerDefinition());
        }
        try {
            return this.sourceHandler.getMessages(kafkaConfig);
        } catch (TimeoutException e) {
            throw new KylinException(ServerErrorCode.STREAMING_TIMEOUT_MESSAGE, MsgPicker.getMsg().getStreamingTimeoutMessage(), e);
        }
    }

    public Map<String, Object> decodeMessage(List<ByteBuffer> list) {
        List list2 = (List) list.stream().map(byteBuffer -> {
            return StandardCharsets.UTF_8.decode(byteBuffer).toString();
        }).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).collect(Collectors.toList());
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("message_type", "custom");
        newHashMap.put("message", list2);
        return newHashMap;
    }

    public Map<String, Object> parserMessage(String str, KafkaConfig kafkaConfig, String str2) {
        if (StringUtils.isBlank(str2)) {
            throw new KylinException(ServerErrorCode.INVALID_STREAMING_MESSAGE, MsgPicker.getMsg().getEmptyStreamingMessage());
        }
        kafkaConfig.setProject(str);
        initDefaultParser(str);
        return this.sourceHandler.parserMessage(kafkaConfig, str2);
    }

    public List<String> getParsers(String str) {
        this.aclEvaluate.checkProjectWritePermission(str);
        initDefaultParser(str);
        return (List) ((DataParserManager) getManager(DataParserManager.class, str)).listDataParserInfo().stream().map((v0) -> {
            return v0.getClassName();
        }).sorted(Comparator.naturalOrder()).collect(Collectors.toList());
    }

    public String removeParser(String str, String str2) {
        this.aclEvaluate.checkProjectWritePermission(str);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicReference atomicReference = new AtomicReference();
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            DataParserManager dataParserManager = (DataParserManager) getManager(DataParserManager.class, str);
            DataParserInfo removeParser = dataParserManager.removeParser(str2);
            atomicBoolean.set(dataParserManager.jarHasParser(removeParser.getJarName()));
            if (atomicBoolean.get()) {
                return null;
            }
            atomicReference.set(((JarInfoManager) getManager(JarInfoManager.class, str)).removeJarInfo(JarTypeEnum.STREAMING_CUSTOM_PARSER, removeParser.getJarName()).getJarPath());
            return null;
        }, str);
        if (!atomicBoolean.get()) {
            ParserClassLoaderState.getInstance(str).unregisterJar(Sets.newHashSet(new String[]{(String) atomicReference.get()}));
            HadoopUtil.deletePath(HadoopUtil.getCurrentConfiguration(), new Path((String) atomicReference.get()));
            log.info("remove jar {} success", atomicReference);
        }
        return str2;
    }
}
