package org.apache.kylin.rest.controller;

import java.io.IOException;
import java.util.List;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.StreamingRequest;
import org.apache.kylin.rest.service.KafkaConfigService;
import org.apache.kylin.rest.service.StreamingService;
import org.apache.kylin.rest.service.TableService;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.tool.shaded.com.fasterxml.jackson.core.JsonParseException;
import org.apache.kylin.tool.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.kylin.tool.shaded.com.fasterxml.jackson.databind.JsonMappingException;
import org.apache.kylin.tool.shaded.org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.stereotype.Controller;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

@RequestMapping({ResourceStore.STREAMING_RESOURCE_ROOT})
@Controller
/* loaded from: input_file:org/apache/kylin/rest/controller/StreamingController.class */
public class StreamingController extends BasicController {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) StreamingController.class);

    @Autowired
    @Qualifier("streamingMgmtService")
    private StreamingService streamingService;

    @Autowired
    @Qualifier("kafkaMgmtService")
    private KafkaConfigService kafkaConfigService;

    @Autowired
    @Qualifier("tableService")
    private TableService tableService;

    @RequestMapping(value = {"/getConfig"}, method = {RequestMethod.GET}, produces = {MimeTypeUtils.APPLICATION_JSON_VALUE})
    @ResponseBody
    public List<StreamingConfig> getStreamings(@RequestParam(value = "table", required = false) String str, @RequestParam(value = "project", required = false) String str2, @RequestParam(value = "limit", required = false) Integer num, @RequestParam(value = "offset", required = false) Integer num2) {
        try {
            return this.streamingService.getStreamingConfigs(str, str2, num, num2);
        } catch (IOException e) {
            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), (Throwable) e);
            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage(), e);
        }
    }

    @RequestMapping(value = {"/getKfkConfig"}, method = {RequestMethod.GET}, produces = {MimeTypeUtils.APPLICATION_JSON_VALUE})
    @ResponseBody
    public List<KafkaConfig> getKafkaConfigs(@RequestParam(value = "kafkaConfigName", required = false) String str, @RequestParam(value = "project", required = false) String str2, @RequestParam(value = "limit", required = false) Integer num, @RequestParam(value = "offset", required = false) Integer num2) {
        try {
            return this.kafkaConfigService.getKafkaConfigs(str, str2, num, num2);
        } catch (IOException e) {
            logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), (Throwable) e);
            throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage(), e);
        }
    }

    @RequestMapping(value = {""}, method = {RequestMethod.POST}, produces = {MimeTypeUtils.APPLICATION_JSON_VALUE})
    @ResponseBody
    public StreamingRequest saveStreamingConfig(@RequestBody StreamingRequest streamingRequest) {
        String project = streamingRequest.getProject();
        TableDesc deserializeTableDesc = deserializeTableDesc(streamingRequest);
        if (null == deserializeTableDesc) {
            throw new BadRequestException("Failed to add streaming table.");
        }
        StreamingConfig deserializeSchemalDesc = deserializeSchemalDesc(streamingRequest);
        if (!streamingRequest.isSuccessful()) {
            return streamingRequest;
        }
        KafkaConfig deserializeKafkaSchemalDesc = deserializeKafkaSchemalDesc(streamingRequest);
        if (!streamingRequest.isSuccessful()) {
            return streamingRequest;
        }
        boolean z = false;
        boolean z2 = false;
        try {
            this.tableService.loadTableToProject(deserializeTableDesc, null, project);
            deserializeSchemalDesc.setName(deserializeTableDesc.getIdentity());
            deserializeKafkaSchemalDesc.setName(deserializeTableDesc.getIdentity());
            InternalErrorException internalErrorException = null;
            try {
                if (StringUtils.isEmpty(deserializeSchemalDesc.getName())) {
                    logger.info("StreamingConfig should not be empty.");
                    throw new BadRequestException("StremingConfig name should not be empty.");
                }
                try {
                    deserializeSchemalDesc.setUuid(RandomUtil.randomUUID().toString());
                    this.streamingService.createStreamingConfig(deserializeSchemalDesc, project);
                    z = true;
                    try {
                        deserializeKafkaSchemalDesc.setUuid(RandomUtil.randomUUID().toString());
                        this.kafkaConfigService.createKafkaConfig(deserializeKafkaSchemalDesc, project);
                        z2 = true;
                        if (1 == 0 || 1 == 0) {
                            if (1 == 1) {
                                try {
                                    this.streamingService.dropStreamingConfig(this.streamingService.getStreamingManager().getStreamingConfig(deserializeSchemalDesc.getName()), project);
                                } catch (IOException e) {
                                    internalErrorException = new InternalErrorException("Action failed and failed to rollback the created streaming config: " + e.getLocalizedMessage(), e);
                                }
                            }
                            if (1 == 1) {
                                try {
                                    this.kafkaConfigService.dropKafkaConfig(this.kafkaConfigService.getKafkaConfig(deserializeKafkaSchemalDesc.getName(), project), project);
                                } catch (IOException e2) {
                                    internalErrorException = new InternalErrorException("Action failed and failed to rollback the created kafka config: " + e2.getLocalizedMessage(), e2);
                                }
                            }
                        }
                        if (null != internalErrorException) {
                            throw internalErrorException;
                        }
                        streamingRequest.setSuccessful(true);
                        return streamingRequest;
                    } catch (IOException e3) {
                        try {
                            this.streamingService.dropStreamingConfig(deserializeSchemalDesc, project);
                            logger.error("Failed to save KafkaConfig:" + e3.getLocalizedMessage(), (Throwable) e3);
                            throw new InternalErrorException("Failed to save KafkaConfig: " + e3.getLocalizedMessage(), e3);
                        } catch (IOException e4) {
                            throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e3.getLocalizedMessage(), e3);
                        }
                    }
                } catch (IOException e5) {
                    logger.error("Failed to save StreamingConfig:" + e5.getLocalizedMessage(), (Throwable) e5);
                    throw new InternalErrorException("Failed to save StreamingConfig: " + e5.getLocalizedMessage(), e5);
                }
            } catch (Throwable th) {
                if (!z2 || !z) {
                    if (z) {
                        try {
                            this.streamingService.dropStreamingConfig(this.streamingService.getStreamingManager().getStreamingConfig(deserializeSchemalDesc.getName()), project);
                        } catch (IOException e6) {
                            new InternalErrorException("Action failed and failed to rollback the created streaming config: " + e6.getLocalizedMessage(), e6);
                        }
                    }
                    if (z2) {
                        try {
                            this.kafkaConfigService.dropKafkaConfig(this.kafkaConfigService.getKafkaConfig(deserializeKafkaSchemalDesc.getName(), project), project);
                        } catch (IOException e7) {
                            new InternalErrorException("Action failed and failed to rollback the created kafka config: " + e7.getLocalizedMessage(), e7);
                        }
                    }
                }
                throw th;
            }
        } catch (IOException e8) {
            throw new BadRequestException("Failed to add streaming table.");
        }
    }

    @RequestMapping(value = {""}, method = {RequestMethod.PUT}, produces = {MimeTypeUtils.APPLICATION_JSON_VALUE})
    @ResponseBody
    public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest) throws JsonProcessingException {
        StreamingConfig deserializeSchemalDesc = deserializeSchemalDesc(streamingRequest);
        if (!streamingRequest.isSuccessful()) {
            return streamingRequest;
        }
        KafkaConfig deserializeKafkaSchemalDesc = deserializeKafkaSchemalDesc(streamingRequest);
        if (!streamingRequest.isSuccessful()) {
            return streamingRequest;
        }
        String project = streamingRequest.getProject();
        if (deserializeSchemalDesc == null) {
            return streamingRequest;
        }
        try {
            this.streamingService.updateStreamingConfig(deserializeSchemalDesc, project);
            try {
                this.kafkaConfigService.updateKafkaConfig(deserializeKafkaSchemalDesc, project);
                streamingRequest.setSuccessful(true);
                return streamingRequest;
            } catch (AccessDeniedException e) {
                throw new ForbiddenException("You don't have right to update this KafkaConfig.");
            } catch (Exception e2) {
                logger.error("Failed to deal with the request:" + e2.getLocalizedMessage(), (Throwable) e2);
                throw new InternalErrorException("Failed to deal with the request: " + e2.getLocalizedMessage(), e2);
            }
        } catch (AccessDeniedException e3) {
            throw new ForbiddenException("You don't have right to update this StreamingConfig.");
        } catch (Exception e4) {
            logger.error("Failed to deal with the request:" + e4.getLocalizedMessage(), (Throwable) e4);
            throw new InternalErrorException("Failed to deal with the request: " + e4.getLocalizedMessage(), e4);
        }
    }

    @RequestMapping(value = {"/{project}/{configName}"}, method = {RequestMethod.DELETE}, produces = {MimeTypeUtils.APPLICATION_JSON_VALUE})
    @ResponseBody
    public void deleteConfig(@PathVariable String str, @PathVariable String str2) throws IOException {
        StreamingConfig streamingConfig = this.streamingService.getStreamingManager().getStreamingConfig(str2);
        KafkaConfig kafkaConfig = this.kafkaConfigService.getKafkaConfig(str2, str);
        if (null == streamingConfig) {
            throw new NotFoundException("StreamingConfig with name " + str2 + " not found..");
        }
        try {
            this.streamingService.dropStreamingConfig(streamingConfig, str);
            this.kafkaConfigService.dropKafkaConfig(kafkaConfig, str);
        } catch (Exception e) {
            logger.error(e.getLocalizedMessage(), (Throwable) e);
            throw new InternalErrorException("Failed to delete StreamingConfig.  Caused by: " + e.getMessage(), e);
        }
    }

    private TableDesc deserializeTableDesc(StreamingRequest streamingRequest) {
        TableDesc tableDesc = null;
        try {
            logger.debug("Saving TableDesc " + streamingRequest.getTableData());
            tableDesc = (TableDesc) JsonUtil.readValue(streamingRequest.getTableData(), TableDesc.class);
            updateRequest(streamingRequest, true, null);
        } catch (JsonParseException e) {
            logger.error("The TableDesc definition is invalid.", (Throwable) e);
            updateRequest(streamingRequest, false, e.getMessage());
        } catch (JsonMappingException e2) {
            logger.error("The data TableDesc definition is invalid.", (Throwable) e2);
            updateRequest(streamingRequest, false, e2.getMessage());
        } catch (IOException e3) {
            logger.error("Failed to deal with the request.", (Throwable) e3);
            throw new InternalErrorException("Failed to deal with the request:" + e3.getMessage(), e3);
        }
        if (null != tableDesc) {
            String[] parseHiveTableName = HadoopUtil.parseHiveTableName(tableDesc.getName());
            tableDesc.setName(parseHiveTableName[1]);
            tableDesc.setDatabase(parseHiveTableName[0]);
            tableDesc.getIdentity();
        }
        return tableDesc;
    }

    private StreamingConfig deserializeSchemalDesc(StreamingRequest streamingRequest) {
        StreamingConfig streamingConfig = null;
        try {
            logger.debug("Saving StreamingConfig " + streamingRequest.getStreamingConfig());
            streamingConfig = (StreamingConfig) JsonUtil.readValue(streamingRequest.getStreamingConfig(), StreamingConfig.class);
            updateRequest(streamingRequest, true, null);
        } catch (JsonParseException e) {
            logger.error("The StreamingConfig definition is invalid.", (Throwable) e);
            updateRequest(streamingRequest, false, e.getMessage());
        } catch (JsonMappingException e2) {
            logger.error("The data StreamingConfig definition is invalid.", (Throwable) e2);
            updateRequest(streamingRequest, false, e2.getMessage());
        } catch (IOException e3) {
            logger.error("Failed to deal with the request.", (Throwable) e3);
            throw new InternalErrorException("Failed to deal with the request:" + e3.getMessage(), e3);
        }
        return streamingConfig;
    }

    private KafkaConfig deserializeKafkaSchemalDesc(StreamingRequest streamingRequest) {
        KafkaConfig kafkaConfig = null;
        try {
            logger.debug("Saving KafkaConfig " + streamingRequest.getKafkaConfig());
            kafkaConfig = (KafkaConfig) JsonUtil.readValue(streamingRequest.getKafkaConfig(), KafkaConfig.class);
            updateRequest(streamingRequest, true, null);
        } catch (JsonParseException e) {
            logger.error("The KafkaConfig definition is invalid.", (Throwable) e);
            updateRequest(streamingRequest, false, e.getMessage());
        } catch (JsonMappingException e2) {
            logger.error("The data KafkaConfig definition is invalid.", (Throwable) e2);
            updateRequest(streamingRequest, false, e2.getMessage());
        } catch (IOException e3) {
            logger.error("Failed to deal with the request.", (Throwable) e3);
            throw new InternalErrorException("Failed to deal with the request:" + e3.getMessage(), e3);
        }
        return kafkaConfig;
    }

    private void updateRequest(StreamingRequest streamingRequest, boolean z, String str) {
        streamingRequest.setSuccessful(z);
        streamingRequest.setMessage(str);
    }
}
