package org.apache.gobblin.service.modules.restli;

import com.google.common.base.Optional;
import com.linkedin.data.transform.DataProcessingException;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.util.PatchApplier;
import java.io.IOException;
import java.util.Collection;
import java.util.Properties;
import java.util.UUID;
import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigLoggedException;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigsResourceHandler;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.class */
public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResourceHandler {
    private static final Logger log = LoggerFactory.getLogger(GobblinServiceFlowConfigResourceHandler.class);
    private String serviceName;
    private boolean flowCatalogLocalCommit;
    private FlowConfigResourceLocalHandler localHandler;
    private Optional<HelixManager> helixManager;
    private GobblinServiceJobScheduler jobScheduler;

    public GobblinServiceFlowConfigResourceHandler(String str, boolean z, FlowConfigResourceLocalHandler flowConfigResourceLocalHandler, Optional<HelixManager> optional, GobblinServiceJobScheduler gobblinServiceJobScheduler) {
        this.flowCatalogLocalCommit = z;
        this.serviceName = str;
        this.localHandler = flowConfigResourceLocalHandler;
        this.helixManager = optional;
        this.jobScheduler = gobblinServiceJobScheduler;
    }

    public FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException {
        return this.localHandler.getFlowConfig(flowId);
    }

    public Collection<FlowConfig> getFlowConfig(FlowSpecSearchObject flowSpecSearchObject) throws FlowConfigLoggedException {
        return this.localHandler.getFlowConfig(flowSpecSearchObject);
    }

    public Collection<FlowConfig> getAllFlowConfigs() {
        return this.localHandler.getAllFlowConfigs();
    }

    public CreateResponse createFlowConfig(FlowConfig flowConfig) throws FlowConfigLoggedException {
        String flowName = flowConfig.getId().getFlowName();
        String flowGroup = flowConfig.getId().getFlowGroup();
        if (flowConfig.getProperties().containsKey("flow.executionId")) {
            throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, String.format("%s cannot be set by the user", "flow.executionId"), (Throwable) null);
        }
        checkHelixConnection("FLOWSPEC_ADD", flowName, flowGroup);
        try {
            if (this.jobScheduler.isActive() || !this.helixManager.isPresent()) {
                return this.localHandler.createFlowConfig(flowConfig);
            }
            CreateResponse createResponse = null;
            if (this.flowCatalogLocalCommit) {
                createResponse = this.localHandler.createFlowConfig(flowConfig, true);
            }
            if (!flowConfig.hasExplain() || !flowConfig.isExplain().booleanValue()) {
                forwardMessage("FLOWSPEC_ADD", FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup);
            }
            return createResponse == null ? new CreateResponse(new ComplexResourceKey(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED) : createResponse;
        } catch (IOException e) {
            throw new FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, "Cannot create flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]", e);
        }
    }

    public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig) throws FlowConfigLoggedException {
        String flowName = flowId.getFlowName();
        String flowGroup = flowId.getFlowGroup();
        if (!flowGroup.equals(flowConfig.getId().getFlowGroup()) || !flowName.equals(flowConfig.getId().getFlowName())) {
            throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "flowName and flowGroup cannot be changed in update", (Throwable) null);
        }
        checkHelixConnection("FLOWSPEC_UPDATE", flowName, flowGroup);
        try {
            if (this.jobScheduler.isActive() || !this.helixManager.isPresent()) {
                return this.localHandler.updateFlowConfig(flowId, flowConfig);
            }
            if (this.flowCatalogLocalCommit) {
                this.localHandler.updateFlowConfig(flowId, flowConfig, false);
            }
            forwardMessage("FLOWSPEC_UPDATE", FlowConfigUtils.serializeFlowConfig(flowConfig), flowName, flowGroup);
            log.info("Forwarding update flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]");
            return new UpdateResponse(HttpStatus.S_200_OK);
        } catch (IOException e) {
            throw new FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, "Cannot update flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]", e);
        }
    }

    public UpdateResponse partialUpdateFlowConfig(FlowId flowId, PatchRequest<FlowConfig> patchRequest) {
        FlowConfig flowConfig = getFlowConfig(flowId);
        try {
            PatchApplier.applyPatch(flowConfig, patchRequest);
            return updateFlowConfig(flowId, flowConfig);
        } catch (DataProcessingException e) {
            throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "Failed to apply partial update", e);
        }
    }

    public UpdateResponse deleteFlowConfig(FlowId flowId, Properties properties) throws FlowConfigLoggedException {
        String flowName = flowId.getFlowName();
        String flowGroup = flowId.getFlowGroup();
        checkHelixConnection("FLOWSPEC_REMOVE", flowName, flowGroup);
        try {
            if (this.jobScheduler.isActive() || !this.helixManager.isPresent()) {
                return this.localHandler.deleteFlowConfig(flowId, properties);
            }
            if (this.flowCatalogLocalCommit) {
                this.localHandler.deleteFlowConfig(flowId, properties, false);
            }
            forwardMessage("FLOWSPEC_REMOVE", FlowConfigUtils.serializeFlowId(flowId), flowName, flowGroup);
            return new UpdateResponse(HttpStatus.S_200_OK);
        } catch (IOException e) {
            throw new FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, "Cannot delete flowConfig [flowName=" + flowName + " flowGroup=" + flowGroup + "]", e);
        }
    }

    private void checkHelixConnection(String str, String str2, String str3) throws FlowConfigLoggedException {
        if (!this.helixManager.isPresent() || ((HelixManager) this.helixManager.get()).isConnected()) {
            return;
        }
        log.warn("System not yet initialized. Skipping operation " + str);
        throw new FlowConfigLoggedException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, "System not yet initialized. Skipping " + str + " flowConfig [flowName=" + str2 + " flowGroup=" + str3 + "]");
    }

    private void forwardMessage(String str, String str2, String str3, String str4) {
        HelixUtils.sendUserDefinedMessage(str, str2, UUID.randomUUID().toString(), InstanceType.CONTROLLER, (HelixManager) this.helixManager.get(), log);
        log.info("{} Forwarding {} flowConfig [flowName={} flowGroup={}", new Object[]{this.serviceName, str, str3, str4 + "]"});
    }

    public String getServiceName() {
        return this.serviceName;
    }
}
