package org.apache.heron.spi.utils;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.proto.tmanager.TopologyManager;
import org.apache.heron.spi.statemgr.SchedulerStateManagerAdaptor;
import org.apache.heron.spi.utils.NetworkUtils;

/* loaded from: input_file:org/apache/heron/spi/utils/TManagerUtils.class */
public final class TManagerUtils {
    private static final Logger LOG = Logger.getLogger(TManagerUtils.class.getName());

    /* loaded from: input_file:org/apache/heron/spi/utils/TManagerUtils$TManagerCommand.class */
    public enum TManagerCommand {
        ACTIVATE,
        DEACTIVATE,
        RUNTIME_CONFIG_UPDATE
    }

    private TManagerUtils() {
    }

    @VisibleForTesting
    public static void sendToTManager(String str, String str2, SchedulerStateManagerAdaptor schedulerStateManagerAdaptor, NetworkUtils.TunnelConfig tunnelConfig) throws TManagerException {
        sendToTManagerWithArguments(str, str2, new ArrayList(), schedulerStateManagerAdaptor, tunnelConfig);
    }

    @VisibleForTesting
    public static void sendToTManagerWithArguments(String str, String str2, List<String> list, SchedulerStateManagerAdaptor schedulerStateManagerAdaptor, NetworkUtils.TunnelConfig tunnelConfig) throws TManagerException {
        LOG.fine("Fetching TManager location for topology: " + str2);
        TopologyManager.TManagerLocation tManagerLocation = schedulerStateManagerAdaptor.getTManagerLocation(str2);
        if (tManagerLocation == null) {
            throw new TManagerException("Failed to fetch TManager location for topology: " + str2);
        }
        LOG.fine("Fetched TManager location for topology: " + str2);
        String format = String.format("http://%s:%d/%s?topologyid=%s", tManagerLocation.getHost(), Integer.valueOf(tManagerLocation.getControllerPort()), str, tManagerLocation.getTopologyId());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            format = (format + "&") + it.next();
        }
        try {
            URL url = new URL(format);
            LOG.fine("HTTP URL for TManager: " + url);
            sendGetRequest(url, str, tunnelConfig);
        } catch (MalformedURLException e) {
            throw new TManagerException("Invalid URL for TManager endpoint: " + format, e);
        }
    }

    private static void sendGetRequest(URL url, String str, NetworkUtils.TunnelConfig tunnelConfig) throws TManagerException {
        HttpURLConnection proxiedHttpConnectionIfNeeded = NetworkUtils.getProxiedHttpConnectionIfNeeded(url, tunnelConfig);
        if (proxiedHttpConnectionIfNeeded == null) {
            throw new TManagerException(String.format("Failed to get a HTTP connection to TManager: %s", url));
        }
        LOG.fine("Successfully opened HTTP connection to TManager");
        NetworkUtils.sendHttpGetRequest(proxiedHttpConnectionIfNeeded);
        LOG.fine("Sent the HTTP payload to TManager");
        try {
            try {
                int responseCode = proxiedHttpConnectionIfNeeded.getResponseCode();
                if (responseCode != 200) {
                    throw new TManagerException(String.format("Non OK HTTP response %d from TManager for command %s", Integer.valueOf(responseCode), str));
                }
                LOG.fine("Successfully got a HTTP response from TManager using command: " + str);
            } catch (IOException e) {
                throw new TManagerException(String.format("Failed to receive HTTP response from TManager using command: `%s`", str), e);
            }
        } finally {
            proxiedHttpConnectionIfNeeded.disconnect();
        }
    }

    private static TopologyAPI.TopologyState getRuntimeTopologyState(String str, SchedulerStateManagerAdaptor schedulerStateManagerAdaptor) throws TManagerException {
        PhysicalPlans.PhysicalPlan physicalPlan = schedulerStateManagerAdaptor.getPhysicalPlan(str);
        if (physicalPlan == null) {
            throw new TManagerException(String.format("Failed to get physical plan for topology '%s'", str));
        }
        return physicalPlan.getTopology().getState();
    }

    public static void transitionTopologyState(String str, TManagerCommand tManagerCommand, SchedulerStateManagerAdaptor schedulerStateManagerAdaptor, TopologyAPI.TopologyState topologyState, TopologyAPI.TopologyState topologyState2, NetworkUtils.TunnelConfig tunnelConfig) throws TManagerException {
        TopologyAPI.TopologyState runtimeTopologyState = getRuntimeTopologyState(str, schedulerStateManagerAdaptor);
        if (runtimeTopologyState == null) {
            throw new TManagerException(String.format("Topology '%s' is not initialized yet", str));
        }
        if (runtimeTopologyState == topologyState2) {
            LOG.warning(String.format("Topology %s command received but topology '%s' already in state %s", tManagerCommand, str, runtimeTopologyState));
        } else {
            if (runtimeTopologyState != topologyState) {
                throw new TManagerException(String.format("Topology '%s' is not in state '%s'", str, topologyState));
            }
            sendToTManager(tManagerCommand.name().toLowerCase(), str, schedulerStateManagerAdaptor, tunnelConfig);
            LOG.log(Level.INFO, "Topology command {0} completed successfully.", tManagerCommand);
        }
    }

    public static void sendRuntimeConfig(String str, TManagerCommand tManagerCommand, SchedulerStateManagerAdaptor schedulerStateManagerAdaptor, String[] strArr, NetworkUtils.TunnelConfig tunnelConfig) throws TManagerException {
        ArrayList arrayList = new ArrayList();
        for (String str2 : strArr) {
            arrayList.add("runtime-config=" + str2);
        }
        sendToTManagerWithArguments("runtime_config/update", str, arrayList, schedulerStateManagerAdaptor, tunnelConfig);
        LOG.log(Level.INFO, "Topology command {0} completed successfully.", tManagerCommand);
    }
}
