package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.util.ConfigUtils;
import com.datatorrent.stram.webapp.ContainerInfo;
import com.google.common.collect.Sets;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StreamingContainerAgent.class */
public class StreamingContainerAgent {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerAgent.class);
    final PTContainer container;
    final StreamingContainerUmbilicalProtocol.StreamingContainerContext initCtx;
    String jvmName;
    int memoryMBFree;
    long gcCollectionCount;
    long gcCollectionTime;
    final StreamingContainerManager dnmgr;
    boolean shutdownRequested = false;
    Set<PTOperator> deployOpers = Sets.newHashSet();
    Set<Integer> undeployOpers = Sets.newHashSet();
    int deployCnt = 0;
    long lastHeartbeatMillis = 0;
    long createdMillis = System.currentTimeMillis();
    private final ConcurrentLinkedQueue<StreamingContainerUmbilicalProtocol.StramToNodeRequest> operatorRequests = new ConcurrentLinkedQueue<>();

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerAgent$ContainerStartRequest.class */
    public static class ContainerStartRequest {
        final PTContainer container;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ContainerStartRequest(PTContainer pTContainer) {
            this.container = pTContainer;
        }
    }

    public StreamingContainerAgent(PTContainer pTContainer, StreamingContainerUmbilicalProtocol.StreamingContainerContext streamingContainerContext, StreamingContainerManager streamingContainerManager) {
        this.container = pTContainer;
        this.initCtx = streamingContainerContext;
        this.memoryMBFree = this.container.getAllocatedMemoryMB();
        this.dnmgr = streamingContainerManager;
    }

    public StreamingContainerUmbilicalProtocol.StreamingContainerContext getInitContext() {
        return this.initCtx;
    }

    public PTContainer getContainer() {
        return this.container;
    }

    public boolean hasPendingWork() {
        Iterator<PTOperator> it = this.container.getOperators().iterator();
        while (it.hasNext()) {
            if (it.next().getState() == PTOperator.State.PENDING_DEPLOY) {
                return true;
            }
        }
        return false;
    }

    public void addOperatorRequest(StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest) {
        LOG.info("Adding operator request {} {}", this.container.getExternalId(), stramToNodeRequest);
        this.operatorRequests.add(stramToNodeRequest);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConcurrentLinkedQueue<StreamingContainerUmbilicalProtocol.StramToNodeRequest> getOperatorRequests() {
        return this.operatorRequests;
    }

    public List<OperatorDeployInfo> getDeployInfoList(Collection<PTOperator> collection) {
        if (this.container.bufferServerAddress == null) {
            throw new AssertionError("No buffer server address assigned");
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        HashSet hashSet = new HashSet();
        PhysicalPlan physicalPlan = this.dnmgr.getPhysicalPlan();
        for (PTOperator pTOperator : collection) {
            if (pTOperator.getState() != PTOperator.State.PENDING_DEPLOY) {
                LOG.debug("Skipping deploy for operator {} state {}", pTOperator, pTOperator.getState());
            } else {
                OperatorDeployInfo createOperatorDeployInfo = createOperatorDeployInfo(pTOperator);
                linkedHashMap.put(createOperatorDeployInfo, pTOperator);
                createOperatorDeployInfo.inputs = new ArrayList(pTOperator.getInputs().size());
                createOperatorDeployInfo.outputs = new ArrayList(pTOperator.getOutputs().size());
                for (PTOperator.PTOutput pTOutput : pTOperator.getOutputs()) {
                    LogicalPlan.StreamMeta streamMeta = pTOutput.logicalStream;
                    OperatorDeployInfo.OutputDeployInfo outputDeployInfo = new OperatorDeployInfo.OutputDeployInfo();
                    outputDeployInfo.declaredStreamId = streamMeta.getName();
                    outputDeployInfo.portName = pTOutput.portName;
                    try {
                        outputDeployInfo.contextAttributes = streamMeta.getSource().getAttributes().clone();
                        boolean z = false;
                        Iterator<PTOperator.PTInput> it = pTOutput.sinks.iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            if (it.next().target.isUnifier()) {
                                z = true;
                                break;
                            }
                        }
                        outputDeployInfo.contextAttributes.put(Context.PortContext.IS_OUTPUT_UNIFIED, Boolean.valueOf(z));
                        if (createOperatorDeployInfo.type == OperatorDeployInfo.OperatorType.UNIFIER) {
                            Iterator<LogicalPlan.InputPortMeta> it2 = streamMeta.getSinks().iterator();
                            if (it2.hasNext()) {
                                outputDeployInfo.contextAttributes = it2.next().getAttributes();
                            }
                        }
                        if (!pTOutput.isDownStreamInline()) {
                            outputDeployInfo.bufferServerHost = pTOperator.getContainer().bufferServerAddress.getHostName();
                            outputDeployInfo.bufferServerPort = pTOperator.getContainer().bufferServerAddress.getPort();
                            outputDeployInfo.bufferServerToken = pTOperator.getContainer().getBufferServerToken();
                            for (PTOperator.PTInput pTInput : pTOutput.sinks) {
                                if (pTInput.target.getContainer() != pTOutput.source.getContainer()) {
                                    StreamCodec<?> streamCodec = getStreamCodec(getIdentifyingInputPortMeta(pTInput));
                                    Integer streamCodecIdentifier = physicalPlan.getStreamCodecIdentifier(streamCodec);
                                    if (!outputDeployInfo.streamCodecs.containsKey(streamCodecIdentifier)) {
                                        outputDeployInfo.streamCodecs.put(streamCodecIdentifier, streamCodec);
                                    }
                                }
                            }
                        }
                        createOperatorDeployInfo.outputs.add(outputDeployInfo);
                        hashSet.add(pTOutput);
                    } catch (CloneNotSupportedException e) {
                        throw new RuntimeException("Cannot clone attributes", e);
                    }
                }
            }
        }
        for (Map.Entry entry : linkedHashMap.entrySet()) {
            OperatorDeployInfo operatorDeployInfo = (OperatorDeployInfo) entry.getKey();
            PTOperator pTOperator2 = (PTOperator) entry.getValue();
            for (PTOperator.PTInput pTInput2 : pTOperator2.getInputs()) {
                LogicalPlan.StreamMeta streamMeta2 = pTInput2.logicalStream;
                if (streamMeta2.getSource() == null) {
                    throw new AssertionError("source is null: " + pTInput2);
                }
                PTOperator.PTOutput pTOutput2 = pTInput2.source;
                OperatorDeployInfo.InputDeployInfo inputDeployInfo = new OperatorDeployInfo.InputDeployInfo();
                inputDeployInfo.declaredStreamId = streamMeta2.getName();
                inputDeployInfo.portName = pTInput2.portName;
                LogicalPlan.InputPortMeta inputPortMeta = getInputPortMeta(pTOperator2.getOperatorMeta(), streamMeta2);
                if (inputPortMeta != null) {
                    inputDeployInfo.contextAttributes = inputPortMeta.getAttributes();
                }
                if (inputDeployInfo.contextAttributes == null && operatorDeployInfo.type == OperatorDeployInfo.OperatorType.UNIFIER) {
                    inputDeployInfo.contextAttributes = pTInput2.source.logicalStream.getSource().getAttributes();
                }
                inputDeployInfo.sourceNodeId = pTOutput2.source.getId();
                inputDeployInfo.sourcePortName = pTOutput2.portName;
                if (pTInput2.partitions != null && pTInput2.partitions.mask != 0) {
                    inputDeployInfo.partitionMask = pTInput2.partitions.mask;
                    inputDeployInfo.partitionKeys = pTInput2.partitions.partitions;
                }
                if (pTOutput2.source.getContainer() != pTOperator2.getContainer()) {
                    PTContainer container = pTOutput2.source.getContainer();
                    InetSocketAddress inetSocketAddress = container.bufferServerAddress;
                    if (inetSocketAddress == null) {
                        throw new AssertionError("upstream address not assigned: " + pTOutput2);
                    }
                    inputDeployInfo.bufferServerHost = inetSocketAddress.getHostName();
                    inputDeployInfo.bufferServerPort = inetSocketAddress.getPort();
                    inputDeployInfo.bufferServerToken = container.getBufferServerToken();
                } else {
                    if (!hashSet.contains(pTOutput2)) {
                        throw new AssertionError("Source not deployed for container local stream " + pTOutput2 + " " + pTInput2);
                    }
                    if (streamMeta2.getLocality() == DAG.Locality.THREAD_LOCAL) {
                        inputDeployInfo.locality = DAG.Locality.THREAD_LOCAL;
                        operatorDeployInfo.type = OperatorDeployInfo.OperatorType.OIO;
                    } else {
                        inputDeployInfo.locality = DAG.Locality.CONTAINER_LOCAL;
                    }
                }
                StreamCodec<?> streamCodec2 = getStreamCodec(getIdentifyingInputPortMeta(pTInput2));
                inputDeployInfo.streamCodecs.put(physicalPlan.getStreamCodecIdentifier(streamCodec2), streamCodec2);
                operatorDeployInfo.inputs.add(inputDeployInfo);
            }
        }
        return new ArrayList(linkedHashMap.keySet());
    }

    public static LogicalPlan.InputPortMeta getInputPortMeta(LogicalPlan.OperatorMeta operatorMeta, LogicalPlan.StreamMeta streamMeta) {
        LogicalPlan.InputPortMeta inputPortMeta = null;
        Iterator<Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta>> it = operatorMeta.getInputStreams().entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> next = it.next();
            if (next.getValue() == streamMeta) {
                inputPortMeta = next.getKey();
                break;
            }
        }
        return inputPortMeta;
    }

    public static LogicalPlan.InputPortMeta getIdentifyingInputPortMeta(PTOperator.PTInput pTInput) {
        PTOperator pTOperator = pTInput.target;
        LogicalPlan.StreamMeta streamMeta = pTInput.logicalStream;
        return !pTOperator.isUnifier() ? getInputPortMeta(pTOperator.getOperatorMeta(), streamMeta) : getInputPortMeta(getIdentifyingOperator(pTOperator).getOperatorMeta(), streamMeta);
    }

    public static PTOperator getIdentifyingOperator(PTOperator pTOperator) {
        while (pTOperator != null && pTOperator.isUnifier()) {
            PTOperator pTOperator2 = null;
            List<PTOperator.PTOutput> outputs = pTOperator.getOutputs();
            if (outputs.size() > 0) {
                List<PTOperator.PTInput> list = outputs.get(0).sinks;
                if (list.size() > 0) {
                    pTOperator2 = list.get(0).target;
                }
            }
            pTOperator = pTOperator2;
        }
        return pTOperator;
    }

    public static StreamCodec<?> getStreamCodec(LogicalPlan.InputPortMeta inputPortMeta) {
        if (inputPortMeta == null) {
            return null;
        }
        StreamCodec<?> streamCodec = (StreamCodec) inputPortMeta.getValue(Context.PortContext.STREAM_CODEC);
        if (streamCodec == null) {
            streamCodec = inputPortMeta.getPortObject().getStreamCodec();
            if (streamCodec != null) {
                inputPortMeta.getAttributes().put(Context.PortContext.STREAM_CODEC, streamCodec);
            }
        }
        return streamCodec;
    }

    private OperatorDeployInfo createOperatorDeployInfo(PTOperator pTOperator) {
        OperatorDeployInfo operatorDeployInfo;
        if (pTOperator.isUnifier()) {
            OperatorDeployInfo.UnifierDeployInfo unifierDeployInfo = new OperatorDeployInfo.UnifierDeployInfo();
            try {
                unifierDeployInfo.operatorAttributes = pTOperator.getUnifiedOperatorMeta().getAttributes().clone();
                operatorDeployInfo = unifierDeployInfo;
            } catch (CloneNotSupportedException e) {
                throw new RuntimeException("Cannot clone unifier attributes", e);
            }
        } else {
            operatorDeployInfo = new OperatorDeployInfo();
            if (pTOperator.getOperatorMeta().getOperator() instanceof InputOperator) {
                operatorDeployInfo.type = OperatorDeployInfo.OperatorType.INPUT;
                if (!pTOperator.getInputs().isEmpty()) {
                    Iterator<PTOperator.PTInput> it = pTOperator.getInputs().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        PTOperator.PTInput next = it.next();
                        if (next.logicalStream != null && next.logicalStream.getSource() != null) {
                            operatorDeployInfo.type = OperatorDeployInfo.OperatorType.GENERIC;
                            break;
                        }
                    }
                }
            } else {
                operatorDeployInfo.type = OperatorDeployInfo.OperatorType.GENERIC;
            }
        }
        Checkpoint recoveryCheckpoint = pTOperator.getRecoveryCheckpoint();
        Operator.ProcessingMode processingMode = (Operator.ProcessingMode) pTOperator.getOperatorMeta().getValue(OperatorContext.PROCESSING_MODE);
        if (processingMode == Operator.ProcessingMode.AT_MOST_ONCE || processingMode == Operator.ProcessingMode.EXACTLY_ONCE) {
            StorageAgent storageAgent = (StorageAgent) pTOperator.getOperatorMeta().getAttributes().get(OperatorContext.STORAGE_AGENT);
            if (storageAgent == null) {
                storageAgent = (StorageAgent) this.initCtx.getValue(OperatorContext.STORAGE_AGENT);
            }
            try {
                long j = -1;
                for (long j2 : storageAgent.getWindowIds(pTOperator.getId())) {
                    if (j2 > j) {
                        j = j2;
                    }
                }
                if (recoveryCheckpoint == null || recoveryCheckpoint.windowId != j) {
                    recoveryCheckpoint = new Checkpoint(j, 0, 0);
                }
            } catch (Exception e2) {
                throw new RuntimeException("Failed to determine checkpoint window id " + pTOperator, e2);
            }
        }
        LOG.debug("{} recovery checkpoint {}", pTOperator, recoveryCheckpoint);
        operatorDeployInfo.checkpoint = recoveryCheckpoint;
        operatorDeployInfo.name = pTOperator.getOperatorMeta().getName();
        operatorDeployInfo.id = pTOperator.getId();
        try {
            operatorDeployInfo.contextAttributes = pTOperator.getOperatorMeta().getAttributes().clone();
            if (pTOperator.isOperatorStateLess()) {
                operatorDeployInfo.contextAttributes.put(OperatorContext.STATELESS, true);
            }
            return operatorDeployInfo;
        } catch (CloneNotSupportedException e3) {
            throw new RuntimeException("Cannot clone operator attributes", e3);
        }
    }

    public ContainerInfo getContainerInfo() {
        ContainerInfo containerInfo = new ContainerInfo();
        containerInfo.id = this.container.getExternalId();
        containerInfo.host = this.container.host;
        containerInfo.state = this.container.getState().name();
        containerInfo.jvmName = this.jvmName;
        containerInfo.numOperators = this.container.getOperators().size();
        containerInfo.memoryMBAllocated = this.container.getAllocatedMemoryMB();
        containerInfo.lastHeartbeat = this.lastHeartbeatMillis;
        containerInfo.memoryMBFree = this.memoryMBFree;
        containerInfo.gcCollectionCount = this.gcCollectionCount;
        containerInfo.gcCollectionTime = this.gcCollectionTime;
        containerInfo.startedTime = this.container.getStartedTime();
        containerInfo.finishedTime = this.container.getFinishedTime();
        if (this.container.nodeHttpAddress != null) {
            YarnConfiguration yarnConfiguration = new YarnConfiguration();
            containerInfo.containerLogsUrl = ConfigUtils.getSchemePrefix(yarnConfiguration) + this.container.nodeHttpAddress + "/node/containerlogs/" + containerInfo.id + "/" + System.getenv(ApplicationConstants.Environment.USER.toString());
            containerInfo.rawContainerLogsUrl = ConfigUtils.getRawContainerLogsUrl(yarnConfiguration, this.container.nodeHttpAddress, (String) this.container.getPlan().getLogicalPlan().getAttributes().get(LogicalPlan.APPLICATION_ID), containerInfo.id);
        }
        return containerInfo;
    }
}
