package org.apache.inlong.manager.plugin.listener;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/manager/plugin/listener/StartupStreamListener.class */
public class StartupStreamListener implements SortOperateListener {
    private static final Logger log = LoggerFactory.getLogger(StartupStreamListener.class);

    /* renamed from: event, reason: merged with bridge method [inline-methods] */
    public TaskEvent m21event() {
        return TaskEvent.COMPLETE;
    }

    public boolean accept(WorkflowContext workflowContext) {
        StreamResourceProcessForm processForm = workflowContext.getProcessForm();
        String inlongGroupId = processForm.getInlongGroupId();
        if (!(processForm instanceof StreamResourceProcessForm)) {
            log.info("not add startup stream listener, not StreamResourceProcessForm for groupId [{}]", inlongGroupId);
            return false;
        }
        StreamResourceProcessForm streamResourceProcessForm = processForm;
        String inlongStreamId = streamResourceProcessForm.getStreamInfo().getInlongStreamId();
        if (streamResourceProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
            log.info("not add startup stream listener, as the operate was not INIT for groupId [{}] streamId [{}]", inlongGroupId, inlongStreamId);
            return false;
        }
        log.info("add startup stream listener for groupId [{}] streamId [{}]", inlongGroupId, inlongStreamId);
        return InlongConstants.STANDARD_MODE.equals(streamResourceProcessForm.getGroupInfo().getInlongGroupMode());
    }

    public ListenerResult listen(WorkflowContext workflowContext) throws Exception {
        ProcessForm processForm = workflowContext.getProcessForm();
        InlongStreamInfo streamInfo = ((StreamResourceProcessForm) processForm).getStreamInfo();
        log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamInfo.getExtList());
        String inlongGroupId = streamInfo.getInlongGroupId();
        String inlongStreamId = streamInfo.getInlongStreamId();
        List sinkList = streamInfo.getSinkList();
        List list = (List) sinkList.stream().map((v0) -> {
            return v0.getSinkType();
        }).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(list)) {
            log.warn("not any sink configured for group {} and stream {}, skip launching sort job", inlongGroupId, inlongStreamId);
            return ListenerResult.success();
        }
        List<InlongStreamExtInfo> extList = streamInfo.getExtList();
        log.info("stream ext info: {}", extList);
        Map map = (Map) extList.stream().filter(inlongStreamExtInfo -> {
            return StringUtils.isNotEmpty(inlongStreamExtInfo.getKeyName()) && StringUtils.isNotEmpty(inlongStreamExtInfo.getKeyValue());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKeyName();
        }, (v0) -> {
            return v0.getKeyValue();
        }));
        String str = (String) map.get("sort.properties");
        if (StringUtils.isNotEmpty(str)) {
            map.putAll((Map) JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(str), new TypeReference<Map<String, String>>() { // from class: org.apache.inlong.manager.plugin.listener.StartupStreamListener.1
            }));
        }
        String str2 = (String) map.get("dataflow");
        if (StringUtils.isEmpty(str2)) {
            String format = String.format("dataflow is empty for groupId [%s], streamId [%s]", inlongGroupId, streamInfo.getInlongStreamId());
            log.error(format);
            return ListenerResult.fail(format);
        }
        FlinkInfo flinkInfo = new FlinkInfo();
        flinkInfo.setJobName(Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + streamInfo.getInlongStreamId());
        flinkInfo.setEndpoint((String) map.get("sort.url"));
        flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
        FlinkOperation flinkOperation = new FlinkOperation(new FlinkService(flinkInfo.getEndpoint()));
        try {
            flinkOperation.genPath(flinkInfo, str2);
            flinkOperation.start(flinkInfo);
            log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", new Object[]{inlongGroupId, streamInfo.getInlongStreamId(), flinkInfo.getJobId()});
            saveInfo(streamInfo, "sort.job.id", flinkInfo.getJobId(), extList);
            flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
            return ListenerResult.success();
        } catch (Exception e) {
            flinkInfo.setException(true);
            flinkInfo.setExceptionMsg(FlinkUtils.getExceptionStackMsg(e));
            flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
            String format2 = String.format("startup sort failed for groupId [%s], streamId [%s]", inlongGroupId, streamInfo.getInlongStreamId());
            log.error(format2, e);
            return ListenerResult.fail(format2 + e.getMessage());
        }
    }

    private void saveInfo(InlongStreamInfo inlongStreamInfo, String str, String str2, List<InlongStreamExtInfo> list) {
        InlongStreamExtInfo inlongStreamExtInfo = new InlongStreamExtInfo();
        inlongStreamExtInfo.setInlongGroupId(inlongStreamInfo.getInlongGroupId());
        inlongStreamExtInfo.setInlongStreamId(inlongStreamInfo.getInlongStreamId());
        inlongStreamExtInfo.setKeyName(str);
        inlongStreamExtInfo.setKeyValue(str2);
        list.add(inlongStreamExtInfo);
    }
}
