package org.apache.inlong.manager.plugin.listener;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/manager/plugin/listener/StartupSortListener.class */
public class StartupSortListener implements SortOperateListener {
    private static final Logger log = LoggerFactory.getLogger(StartupSortListener.class);

    /* renamed from: event, reason: merged with bridge method [inline-methods] */
    public TaskEvent m19event() {
        return TaskEvent.COMPLETE;
    }

    public boolean accept(WorkflowContext workflowContext) {
        GroupResourceProcessForm processForm = workflowContext.getProcessForm();
        String inlongGroupId = processForm.getInlongGroupId();
        if (!(processForm instanceof GroupResourceProcessForm)) {
            log.info("not add startup group listener, not GroupResourceProcessForm for groupId [{}]", inlongGroupId);
            return false;
        }
        GroupResourceProcessForm groupResourceProcessForm = processForm;
        if (groupResourceProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
            log.info("not add startup group listener, as the operate was not INIT for groupId [{}]", inlongGroupId);
            return false;
        }
        log.info("add startup group listener for groupId [{}]", inlongGroupId);
        return InlongConstants.DATASYNC_MODE.equals(groupResourceProcessForm.getGroupInfo().getInlongGroupMode());
    }

    public ListenerResult listen(WorkflowContext workflowContext) throws Exception {
        ProcessForm processForm = workflowContext.getProcessForm();
        String inlongGroupId = processForm.getInlongGroupId();
        if (!(processForm instanceof GroupResourceProcessForm)) {
            String format = String.format("process form was not GroupResource for groupId [%s]", inlongGroupId);
            log.error(format);
            return ListenerResult.fail(format);
        }
        List<InlongStreamInfo> streamInfos = ((GroupResourceProcessForm) processForm).getStreamInfos();
        if (((Integer) streamInfos.stream().map(inlongStreamInfo -> {
            return Integer.valueOf(inlongStreamInfo.getSinkList() == null ? 0 : inlongStreamInfo.getSinkList().size());
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue() == 0) {
            log.warn("not any sink configured for group {}, skip launching sort job", inlongGroupId);
            return ListenerResult.success();
        }
        for (InlongStreamInfo inlongStreamInfo2 : streamInfos) {
            List sinkList = inlongStreamInfo2.getSinkList();
            List list = (List) sinkList.stream().map((v0) -> {
                return v0.getSinkType();
            }).collect(Collectors.toList());
            if (CollectionUtils.isEmpty(sinkList) || !SinkType.containSortFlinkSink(list)) {
                return ListenerResult.success();
            }
            List<InlongStreamExtInfo> extList = inlongStreamInfo2.getExtList();
            log.info("stream ext info: {}", extList);
            Map map = (Map) extList.stream().filter(inlongStreamExtInfo -> {
                return StringUtils.isNotEmpty(inlongStreamExtInfo.getKeyName()) && StringUtils.isNotEmpty(inlongStreamExtInfo.getKeyValue());
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKeyName();
            }, (v0) -> {
                return v0.getKeyValue();
            }));
            String str = (String) map.get("sort.properties");
            if (StringUtils.isNotEmpty(str)) {
                map.putAll((Map) JsonUtils.OBJECT_MAPPER.convertValue(JsonUtils.OBJECT_MAPPER.readTree(str), new TypeReference<Map<String, String>>() { // from class: org.apache.inlong.manager.plugin.listener.StartupSortListener.1
                }));
            }
            String str2 = (String) map.get("dataflow");
            if (StringUtils.isEmpty(str2)) {
                String format2 = String.format("dataflow is empty for groupId [%s], streamId [%s]", inlongGroupId, inlongStreamInfo2.getInlongStreamId());
                log.error(format2);
                return ListenerResult.fail(format2);
            }
            FlinkInfo flinkInfo = new FlinkInfo();
            flinkInfo.setJobName(Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + inlongStreamInfo2.getInlongStreamId());
            flinkInfo.setEndpoint((String) map.get("sort.url"));
            flinkInfo.setInlongStreamInfoList(Collections.singletonList(inlongStreamInfo2));
            FlinkOperation flinkOperation = new FlinkOperation(new FlinkService(flinkInfo.getEndpoint()));
            try {
                flinkOperation.genPath(flinkInfo, str2);
                flinkOperation.start(flinkInfo);
                log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", new Object[]{inlongGroupId, inlongStreamInfo2.getInlongStreamId(), flinkInfo.getJobId()});
                saveInfo(inlongStreamInfo2, "sort.job.id", flinkInfo.getJobId(), extList);
                flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
            } catch (Exception e) {
                flinkInfo.setException(true);
                flinkInfo.setExceptionMsg(FlinkUtils.getExceptionStackMsg(e));
                flinkOperation.pollJobStatus(flinkInfo, JobStatus.RUNNING);
                String format3 = String.format("startup sort failed for groupId [%s], streamId [%s]", inlongGroupId, inlongStreamInfo2.getInlongStreamId());
                log.error(format3, e);
                return ListenerResult.fail(format3 + e.getMessage());
            }
        }
        return ListenerResult.success();
    }

    private void saveInfo(InlongStreamInfo inlongStreamInfo, String str, String str2, List<InlongStreamExtInfo> list) {
        InlongStreamExtInfo inlongStreamExtInfo = new InlongStreamExtInfo();
        inlongStreamExtInfo.setInlongGroupId(inlongStreamInfo.getInlongGroupId());
        inlongStreamExtInfo.setInlongStreamId(inlongStreamInfo.getInlongStreamId());
        inlongStreamExtInfo.setKeyName(str);
        inlongStreamExtInfo.setKeyValue(str2);
        list.add(inlongStreamExtInfo);
    }
}
