package org.apache.gobblin.service.modules.core;

import java.beans.ConstructorProperties;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.class */
class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
    private boolean flowCatalogLocalCommit;
    private GobblinServiceJobScheduler jobScheduler;
    private GobblinServiceFlowConfigResourceHandler resourceHandler;
    private String serviceName;

    /* loaded from: input_file:org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory$ControllerUserDefinedMessageHandler.class */
    private static class ControllerUserDefinedMessageHandler extends MessageHandler {
        private static final Logger log = LoggerFactory.getLogger(ControllerUserDefinedMessageHandler.class);
        private boolean flowCatalogLocalCommit;
        private GobblinServiceJobScheduler jobScheduler;
        private GobblinServiceFlowConfigResourceHandler resourceHandler;
        private String serviceName;

        public ControllerUserDefinedMessageHandler(Message message, NotificationContext notificationContext, String str, boolean z, GobblinServiceJobScheduler gobblinServiceJobScheduler, GobblinServiceFlowConfigResourceHandler gobblinServiceFlowConfigResourceHandler) {
            super(message, notificationContext);
            this.serviceName = str;
            this.flowCatalogLocalCommit = z;
            this.jobScheduler = gobblinServiceJobScheduler;
            this.resourceHandler = gobblinServiceFlowConfigResourceHandler;
        }

        private void handleAdd(String str) throws IOException {
            FlowConfig deserializeFlowConfig = FlowConfigUtils.deserializeFlowConfig(str);
            if (!this.flowCatalogLocalCommit) {
                this.resourceHandler.createFlowConfig(deserializeFlowConfig);
                return;
            }
            Spec createFlowSpecForConfig = FlowConfigResourceLocalHandler.createFlowSpecForConfig(deserializeFlowConfig);
            log.info("Only handle add {} scheduling because flow catalog is committed locally on standby.", createFlowSpecForConfig);
            this.jobScheduler.onAddSpec(createFlowSpecForConfig);
        }

        private void handleUpdate(String str) throws IOException {
            FlowConfig deserializeFlowConfig = FlowConfigUtils.deserializeFlowConfig(str);
            if (!this.flowCatalogLocalCommit) {
                this.resourceHandler.updateFlowConfig(deserializeFlowConfig.getId(), deserializeFlowConfig);
                return;
            }
            Spec createFlowSpecForConfig = FlowConfigResourceLocalHandler.createFlowSpecForConfig(deserializeFlowConfig);
            log.info("Only handle update {} scheduling because flow catalog is committed locally on standby.", createFlowSpecForConfig);
            this.jobScheduler.onUpdateSpec(createFlowSpecForConfig);
        }

        private void handleDelete(String str) throws IOException {
            try {
                FlowId deserializeFlowId = FlowConfigUtils.deserializeFlowId(str);
                if (this.flowCatalogLocalCommit) {
                    URI createFlowSpecUri = FlowSpec.Utils.createFlowSpecUri(deserializeFlowId);
                    log.info("Only handle update {} scheduling because flow catalog is committed locally on standby.", createFlowSpecUri);
                    this.jobScheduler.onDeleteSpec(createFlowSpecUri, "");
                } else {
                    this.resourceHandler.deleteFlowConfig(deserializeFlowId, new Properties());
                }
            } catch (URISyntaxException e) {
                throw new IOException(e);
            }
        }

        public HelixTaskResult handleMessage() throws InterruptedException {
            if (this.jobScheduler.isActive()) {
                String attribute = this._message.getAttribute(Message.Attributes.INNER_MESSAGE);
                log.info("{} ControllerUserDefinedMessage received : {}, type {}", new Object[]{this.serviceName, attribute, this._message.getMsgSubType()});
                try {
                    if (this._message.getMsgSubType().equals("FLOWSPEC_ADD")) {
                        handleAdd(attribute);
                    } else if (this._message.getMsgSubType().equals("FLOWSPEC_REMOVE")) {
                        handleDelete(attribute);
                    } else if (this._message.getMsgSubType().equals("FLOWSPEC_UPDATE")) {
                        handleUpdate(attribute);
                    }
                } catch (IOException e) {
                    log.error("Cannot process Helix message.", e);
                    HelixTaskResult helixTaskResult = new HelixTaskResult();
                    helixTaskResult.setSuccess(false);
                    return helixTaskResult;
                }
            } else {
                log.error("ControllerUserDefinedMessage received but ignored due to not in active mode: {}, type {}", this._message.getAttribute(Message.Attributes.INNER_MESSAGE), this._message.getMsgSubType());
            }
            HelixTaskResult helixTaskResult2 = new HelixTaskResult();
            helixTaskResult2.setSuccess(true);
            return helixTaskResult2;
        }

        public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            log.error(String.format("Failed to handle message with exception %s, error code %s, error type %s", exc, errorCode, errorType));
        }
    }

    public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
        return new ControllerUserDefinedMessageHandler(message, notificationContext, this.serviceName, this.flowCatalogLocalCommit, this.jobScheduler, this.resourceHandler);
    }

    public String getMessageType() {
        return Message.MessageType.USER_DEFINE_MSG.toString();
    }

    public List<String> getMessageTypes() {
        return Collections.singletonList(getMessageType());
    }

    public void reset() {
    }

    @ConstructorProperties({"flowCatalogLocalCommit", "jobScheduler", "resourceHandler", "serviceName"})
    public ControllerUserDefinedMessageHandlerFactory(boolean z, GobblinServiceJobScheduler gobblinServiceJobScheduler, GobblinServiceFlowConfigResourceHandler gobblinServiceFlowConfigResourceHandler, String str) {
        this.flowCatalogLocalCommit = z;
        this.jobScheduler = gobblinServiceJobScheduler;
        this.resourceHandler = gobblinServiceFlowConfigResourceHandler;
        this.serviceName = str;
    }
}
