package com.datatorrent.stram;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.bufferserver.auth.AuthManager;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.common.util.NumberAggregate;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.Journal;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.api.AppDataSource;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest;
import com.datatorrent.stram.api.StramToNodeGetPropertyRequest;
import com.datatorrent.stram.api.StramToNodeSetPropertyRequest;
import com.datatorrent.stram.api.StramToNodeStartRecordingRequest;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.OperatorResponse;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalOperatorStatus;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
import com.datatorrent.stram.plan.physical.OperatorStatus;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.plan.physical.PlanModifier;
import com.datatorrent.stram.util.ConfigUtils;
import com.datatorrent.stram.util.FSJsonLineFile;
import com.datatorrent.stram.util.MovingAverage;
import com.datatorrent.stram.util.SharedPubSubWebSocketClient;
import com.datatorrent.stram.util.WebServicesClient;
import com.datatorrent.stram.webapp.ContainerInfo;
import com.datatorrent.stram.webapp.LogicalOperatorInfo;
import com.datatorrent.stram.webapp.OperatorAggregationInfo;
import com.datatorrent.stram.webapp.OperatorInfo;
import com.datatorrent.stram.webapp.PortInfo;
import com.datatorrent.stram.webapp.StramWebServices;
import com.datatorrent.stram.webapp.StreamInfo;
import com.datatorrent.stram.webapp.asm.MethodSignatureVisitor;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
import org.apache.apex.engine.util.CascadeStorageAgent;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager.class */
public class StreamingContainerManager implements PhysicalPlan.PlanContext {
    public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login";
    public static final String BUILTIN_APPDATA_URL = "builtin";
    public static final String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json";
    public static final String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json";
    public static final String APP_META_FILENAME = "meta.json";
    public static final String APP_META_KEY_ATTRIBUTES = "attributes";
    public static final String APP_META_KEY_METRICS = "metrics";
    public static final String EMBEDDABLE_QUERY_NAME_SUFFIX = ".query";
    public static final int METRIC_QUEUE_SIZE = 1000;
    private final FinalVars vars;
    private final PhysicalPlan plan;
    private final Clock clock;
    private SharedPubSubWebSocketClient wsClient;
    private FSStatsRecorder statsRecorder;
    private FSEventRecorder eventRecorder;
    protected final Map<String, String> containerStopRequests;
    protected final ConcurrentLinkedQueue<StreamingContainerAgent.ContainerStartRequest> containerStartRequests;
    protected boolean forcedShutdown;
    private final ConcurrentLinkedQueue<Runnable> eventQueue;
    private final AtomicBoolean eventQueueProcessing;
    private final HashSet<PTContainer> pendingAllocation;
    protected String shutdownDiagnosticsMessage;
    private long lastResourceRequest;
    private final Map<String, StreamingContainerAgent> containers;
    private final List<Pair<PTOperator, Long>> purgeCheckpoints;
    private Map<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>> checkpointGroups;
    private final Map<Long, Set<PTOperator>> shutdownOperators;
    private CriticalPathInfo criticalPathInfo;
    private final ConcurrentMap<PTOperator, PTOperator> reportStats;
    private final AtomicBoolean deployChangeInProgress;
    private int deployChangeCnt;
    private MBassador<StramEvent> eventBus;
    private final Journal journal;
    private RecoveryHandler recoveryHandler;
    private final ConcurrentSkipListMap<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap;
    private final ConcurrentMap<PTOperator, PTOperator> slowestUpstreamOp;
    private long committedWindowId;
    private long lastCommittedWindowId;
    private final Map<Pair<Integer, String>, Long> operatorPortLastEndWindowTimestamps;
    private final Map<Integer, Long> operatorLastEndWindowTimestamps;
    private long lastStatsTimestamp;
    private long currentEndWindowStatsWindowId;
    private long completeEndWindowStatsWindowId;
    private final ConcurrentHashMap<String, MovingAverage.MovingAverageLong> rpcLatencies;
    private final AtomicLong nodeToStramRequestIds;
    private int allocatedMemoryMB;
    private List<AppDataSource> appDataSources;
    private final Cache<Long, Object> commandResponse;
    private transient ExecutorService poolExecutor;
    private FileContext fileContext;
    private final Map<String, Queue<Pair<Long, Map<String, Object>>>> logicalMetrics;
    private final Map<String, Map<String, Object>> latestLogicalMetrics;
    private final Map<String, Object> latestLogicalCounters;
    public transient ApexPluginDispatcher apexPluginDispatcher;
    private final LinkedHashMap<String, ContainerInfo> completedContainers;
    private FSJsonLineFile containerFile;
    private FSJsonLineFile operatorFile;
    private final long startTime;
    private final Object appDataSourcesLock;
    private boolean startedFromCheckpoint;
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class);
    public static final Journal.Recoverable SET_OPERATOR_PROPERTY = new SetOperatorProperty();
    public static final Journal.Recoverable SET_PHYSICAL_OPERATOR_PROPERTY = new SetPhysicalOperatorProperty();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.datatorrent.stram.StreamingContainerManager$5, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$5.class */
    public static /* synthetic */ class AnonymousClass5 {
        static final /* synthetic */ int[] $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$OperatorHeartbeat$DeployState;
        static final /* synthetic */ int[] $SwitchMap$com$datatorrent$stram$plan$physical$PTOperator$State = new int[PTOperator.State.values().length];

        static {
            try {
                $SwitchMap$com$datatorrent$stram$plan$physical$PTOperator$State[PTOperator.State.ACTIVE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$datatorrent$stram$plan$physical$PTOperator$State[PTOperator.State.PENDING_UNDEPLOY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$datatorrent$stram$plan$physical$PTOperator$State[PTOperator.State.PENDING_DEPLOY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$OperatorHeartbeat$DeployState = new int[StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.values().length];
            try {
                $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$OperatorHeartbeat$DeployState[StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.SHUTDOWN.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$OperatorHeartbeat$DeployState[StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$OperatorHeartbeat$DeployState[StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$CheckpointState.class */
    public static class CheckpointState implements Serializable {
        private static final long serialVersionUID = 3827310557521807024L;
        private FinalVars finals;
        private PhysicalPlan physicalPlan;

        CheckpointState() {
        }

        public void setApplicationId(LogicalPlan logicalPlan, Configuration configuration) {
            LogicalPlan logicalPlan2 = this.physicalPlan.getLogicalPlan();
            String str = (String) logicalPlan.getValue(LogicalPlan.APPLICATION_ID);
            String str2 = (String) logicalPlan2.getValue(LogicalPlan.APPLICATION_ID);
            if (str2 == null) {
                throw new AssertionError("Missing original application id");
            }
            logicalPlan2.setAttribute(LogicalPlan.APPLICATION_ID, str);
            logicalPlan2.setAttribute(LogicalPlan.APPLICATION_PATH, logicalPlan.assertAppPath());
            logicalPlan2.setAttribute(Context.DAGContext.LIBRARY_JARS, logicalPlan.getValue(Context.DAGContext.LIBRARY_JARS));
            logicalPlan2.setAttribute(LogicalPlan.ARCHIVES, logicalPlan.getValue(LogicalPlan.ARCHIVES));
            this.finals = new FinalVars(this.finals, logicalPlan2);
            logicalPlan2.setAttribute(Context.OperatorContext.STORAGE_AGENT, StreamingContainerManager.updateStorageAgent((StorageAgent) logicalPlan2.getValue(Context.OperatorContext.STORAGE_AGENT), str2, str, configuration));
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$ContainerResource.class */
    public static class ContainerResource {
        public final String containerId;
        public final String host;
        public final int memoryMB;
        public final int vCores;
        public final int priority;
        public final String nodeHttpAddress;

        public ContainerResource(int i, String str, String str2, int i2, int i3, String str3) {
            this.containerId = str;
            this.host = str2;
            this.memoryMB = i2;
            this.vCores = i3;
            this.priority = i;
            this.nodeHttpAddress = str3;
        }

        public String toString() {
            return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("containerId", this.containerId).append("host", this.host).append("memoryMB", this.memoryMB).toString();
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$CriticalPathInfo.class */
    public static class CriticalPathInfo {
        long latency;
        final LinkedList<Integer> path;

        public CriticalPathInfo() {
            this.path = new LinkedList<>();
        }

        private CriticalPathInfo(long j, LinkedList<Integer> linkedList) {
            this.latency = j;
            this.path = linkedList;
        }

        protected Object clone() throws CloneNotSupportedException {
            return new CriticalPathInfo(this.latency, (LinkedList) this.path.clone());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$EndWindowStats.class */
    public static class EndWindowStats {
        long emitTimestamp = -1;
        HashMap<String, Long> dequeueTimestamps = new HashMap<>();
        Object counters;
        Map<String, Object> metrics;

        EndWindowStats() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$FinalVars.class */
    public static class FinalVars implements Serializable {
        private static final long serialVersionUID = 3827310557521807024L;
        private final long windowStartMillis;
        private final int heartbeatTimeoutMillis;
        private final String appPath;
        private final int maxWindowsBehindForStats;
        private final boolean enableStatsRecording;
        private final int rpcLatencyCompensationSamples;

        private FinalVars(LogicalPlan logicalPlan, long j) {
            Attribute.AttributeMap attributes = logicalPlan.getAttributes();
            this.windowStartMillis = j - (j % 1000);
            if (attributes.get(LogicalPlan.APPLICATION_PATH) == null) {
                throw new IllegalArgumentException("Not set: " + LogicalPlan.APPLICATION_PATH);
            }
            this.appPath = (String) attributes.get(LogicalPlan.APPLICATION_PATH);
            if (attributes.get(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS) == null) {
                attributes.put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 500);
            }
            if (attributes.get(LogicalPlan.CHECKPOINT_WINDOW_COUNT) == null) {
                attributes.put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, Integer.valueOf(30000 / ((Integer) attributes.get(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)).intValue()));
            }
            this.heartbeatTimeoutMillis = ((Integer) logicalPlan.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS)).intValue();
            this.maxWindowsBehindForStats = ((Integer) logicalPlan.getValue(LogicalPlan.STATS_MAX_ALLOWABLE_WINDOWS_LAG)).intValue();
            this.enableStatsRecording = ((Boolean) logicalPlan.getValue(LogicalPlan.ENABLE_STATS_RECORDING)).booleanValue();
            this.rpcLatencyCompensationSamples = ((Integer) logicalPlan.getValue(LogicalPlan.RPC_LATENCY_COMPENSATION_SAMPLES)).intValue();
        }

        private FinalVars(FinalVars finalVars, LogicalPlan logicalPlan) {
            this.windowStartMillis = finalVars.windowStartMillis;
            this.heartbeatTimeoutMillis = finalVars.heartbeatTimeoutMillis;
            this.maxWindowsBehindForStats = finalVars.maxWindowsBehindForStats;
            this.enableStatsRecording = finalVars.enableStatsRecording;
            this.appPath = (String) logicalPlan.getValue(LogicalPlan.APPLICATION_PATH);
            this.rpcLatencyCompensationSamples = finalVars.rpcLatencyCompensationSamples;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$LogicalPlanChangeRunnable.class */
    private class LogicalPlanChangeRunnable implements Callable<Object> {
        final List<LogicalPlanRequest> requests;

        private LogicalPlanChangeRunnable(List<LogicalPlanRequest> list) {
            this.requests = list;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            StreamingContainerManager.LOG.info("Begin plan changes: {}", this.requests);
            LogicalPlan logicalPlan = StreamingContainerManager.this.plan.getLogicalPlan();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            LogicalPlan.write(logicalPlan, byteArrayOutputStream);
            byteArrayOutputStream.flush();
            LogicalPlan read = LogicalPlan.read(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
            PlanModifier planModifier = new PlanModifier(read);
            for (LogicalPlanRequest logicalPlanRequest : this.requests) {
                StreamingContainerManager.LOG.debug("Dry run plan change: {}", logicalPlanRequest);
                logicalPlanRequest.execute(planModifier);
            }
            read.validate();
            PlanModifier planModifier2 = new PlanModifier(StreamingContainerManager.this.plan);
            for (LogicalPlanRequest logicalPlanRequest2 : this.requests) {
                logicalPlanRequest2.execute(planModifier2);
                StreamingContainerManager.this.recordEventAsync(new StramEvent.ChangeLogicalPlanEvent(logicalPlanRequest2));
            }
            planModifier2.applyChanges(StreamingContainerManager.this);
            StreamingContainerManager.this.apexPluginDispatcher.dispatch(new ApexPluginDispatcher.DAGChangeEvent(StreamingContainerManager.this.plan.getLogicalPlan()));
            StreamingContainerManager.LOG.info("Plan changes applied: {}", this.requests);
            return null;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$RecordingRequestFilter.class */
    private static class RecordingRequestFilter implements Predicate<StreamingContainerUmbilicalProtocol.StramToNodeRequest> {
        static final Set<StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType> MATCH_TYPES = Sets.newHashSet(new StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType[]{StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.START_RECORDING, StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.STOP_RECORDING, StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.SYNC_RECORDING});

        private RecordingRequestFilter() {
        }

        public boolean apply(@Nullable StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest) {
            return stramToNodeRequest != null && MATCH_TYPES.contains(stramToNodeRequest.getRequestType());
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$RecoveryHandler.class */
    public interface RecoveryHandler {
        void save(Object obj) throws IOException;

        Object restore() throws IOException;

        DataOutputStream rotateLog() throws IOException;

        DataInputStream getLog() throws IOException;
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$RequestHandler.class */
    private class RequestHandler implements Callable<Object> {
        public long requestId;
        public long waitTime;

        private RequestHandler() {
            this.waitTime = StramWebServices.WAIT_TIME;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            Object ifPresent;
            long currentTimeMillis = System.currentTimeMillis() + this.waitTime;
            while (true) {
                ifPresent = StreamingContainerManager.this.commandResponse.getIfPresent(Long.valueOf(this.requestId));
                if (ifPresent != null || currentTimeMillis <= System.currentTimeMillis()) {
                    break;
                }
                Thread.sleep(100L);
                StreamingContainerManager.LOG.debug("Polling for a response to request with Id {}", Long.valueOf(this.requestId));
            }
            if (ifPresent == null) {
                return null;
            }
            StreamingContainerManager.this.commandResponse.invalidate(Long.valueOf(this.requestId));
            return ifPresent;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$SetOperatorProperty.class */
    private static class SetOperatorProperty implements Journal.Recoverable {
        private final String operatorName;
        private final String propertyName;
        private final String propertyValue;

        private SetOperatorProperty() {
            this(null, null, null);
        }

        private SetOperatorProperty(String str, String str2, String str3) {
            this.operatorName = str;
            this.propertyName = str2;
            this.propertyValue = str3;
        }

        @Override // com.datatorrent.stram.Journal.Recoverable
        public void read(Object obj, Input input) throws KryoException {
            StreamingContainerManager streamingContainerManager = (StreamingContainerManager) obj;
            String readString = input.readString();
            String readString2 = input.readString();
            String readString3 = input.readString();
            LogicalPlan.OperatorMeta m92getOperatorMeta = streamingContainerManager.plan.getLogicalPlan().m92getOperatorMeta(readString);
            if (m92getOperatorMeta == null) {
                throw new IllegalArgumentException("Unknown operator " + readString);
            }
            streamingContainerManager.setOperatorProperty(m92getOperatorMeta, readString2, readString3);
        }

        @Override // com.datatorrent.stram.Journal.Recoverable
        public void write(Output output) throws KryoException {
            output.writeString(this.operatorName);
            output.writeString(this.propertyName);
            output.writeString(this.propertyValue);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$SetOperatorPropertyRequestFilter.class */
    public class SetOperatorPropertyRequestFilter implements Predicate<StreamingContainerUmbilicalProtocol.StramToNodeRequest> {
        final String propertyKey;

        SetOperatorPropertyRequestFilter(String str) {
            this.propertyKey = str;
        }

        public boolean apply(@Nullable StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest) {
            if (stramToNodeRequest != null && (stramToNodeRequest instanceof StramToNodeSetPropertyRequest)) {
                return ((StramToNodeSetPropertyRequest) stramToNodeRequest).getPropertyKey().equals(this.propertyKey);
            }
            return false;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$SetPhysicalOperatorProperty.class */
    private static class SetPhysicalOperatorProperty implements Journal.Recoverable {
        private final int operatorId;
        private final String propertyName;
        private final String propertyValue;

        private SetPhysicalOperatorProperty() {
            this(-1, null, null);
        }

        private SetPhysicalOperatorProperty(int i, String str, String str2) {
            this.operatorId = i;
            this.propertyName = str;
            this.propertyValue = str2;
        }

        @Override // com.datatorrent.stram.Journal.Recoverable
        public void read(Object obj, Input input) throws KryoException {
            StreamingContainerManager streamingContainerManager = (StreamingContainerManager) obj;
            int readInt = input.readInt();
            String readString = input.readString();
            String readString2 = input.readString();
            PTOperator pTOperator = streamingContainerManager.plan.getAllOperators().get(Integer.valueOf(readInt));
            if (pTOperator == null) {
                throw new IllegalArgumentException("Unknown physical operator " + readInt);
            }
            streamingContainerManager.setPhysicalOperatorProperty(pTOperator, readString, readString2);
        }

        @Override // com.datatorrent.stram.Journal.Recoverable
        public void write(Output output) throws KryoException {
            output.writeInt(this.operatorId);
            output.writeString(this.propertyName);
            output.writeString(this.propertyValue);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$UpdateCheckpointsContext.class */
    public static class UpdateCheckpointsContext {
        public final MutableLong committedWindowId;
        public final Set<PTOperator> visited;
        public final Set<PTOperator> blocked;
        public final long currentTms;
        public final boolean recovery;
        public final Map<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>> checkpointGroups;

        public UpdateCheckpointsContext(Clock clock) {
            this(clock, false, Collections.emptyMap());
        }

        public UpdateCheckpointsContext(Clock clock, boolean z, Map<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>> map) {
            this.committedWindowId = new MutableLong(Long.MAX_VALUE);
            this.visited = new LinkedHashSet();
            this.blocked = new LinkedHashSet();
            this.currentTms = clock.getTime();
            this.recovery = z;
            this.checkpointGroups = map;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datatorrent/stram/StreamingContainerManager$UpdateOperatorLatencyContext.class */
    public static class UpdateOperatorLatencyContext {
        Map<String, MovingAverage.MovingAverageLong> rpcLatencies;
        Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap;

        UpdateOperatorLatencyContext() {
        }

        UpdateOperatorLatencyContext(Map<String, MovingAverage.MovingAverageLong> map, Map<Long, Map<Integer, EndWindowStats>> map2) {
            this.rpcLatencies = map;
            this.endWindowStatsOperatorMap = map2;
        }

        long getRPCLatency(PTOperator pTOperator) {
            MovingAverage.MovingAverageLong movingAverageLong = this.rpcLatencies.get(pTOperator.getContainer().getExternalId());
            if (movingAverageLong == null) {
                return 0L;
            }
            return movingAverageLong.getAvg();
        }

        boolean endWindowStatsExists(long j) {
            return this.endWindowStatsOperatorMap.containsKey(Long.valueOf(j));
        }

        long getEndWindowEmitTimestamp(long j, PTOperator pTOperator) {
            EndWindowStats endWindowStats;
            Map<Integer, EndWindowStats> map = this.endWindowStatsOperatorMap.get(Long.valueOf(j));
            if (map == null || (endWindowStats = map.get(Integer.valueOf(pTOperator.getId()))) == null) {
                return -1L;
            }
            return endWindowStats.emitTimestamp;
        }
    }

    public StreamingContainerManager(LogicalPlan logicalPlan, Clock clock) {
        this(logicalPlan, false, clock);
    }

    public StreamingContainerManager(LogicalPlan logicalPlan) {
        this(logicalPlan, false, new SystemClock());
    }

    public StreamingContainerManager(LogicalPlan logicalPlan, boolean z, Clock clock) {
        this.containerStopRequests = new ConcurrentHashMap();
        this.containerStartRequests = new ConcurrentLinkedQueue<>();
        this.forcedShutdown = false;
        this.eventQueue = new ConcurrentLinkedQueue<>();
        this.eventQueueProcessing = new AtomicBoolean();
        this.pendingAllocation = Sets.newLinkedHashSet();
        this.shutdownDiagnosticsMessage = "";
        this.lastResourceRequest = 0L;
        this.containers = new ConcurrentHashMap();
        this.purgeCheckpoints = new ArrayList();
        this.shutdownOperators = new HashMap();
        this.reportStats = new ConcurrentHashMap();
        this.deployChangeInProgress = new AtomicBoolean();
        this.endWindowStatsOperatorMap = new ConcurrentSkipListMap<>();
        this.slowestUpstreamOp = new ConcurrentHashMap();
        this.lastCommittedWindowId = Checkpoint.INITIAL_CHECKPOINT.getWindowId();
        this.operatorPortLastEndWindowTimestamps = Maps.newConcurrentMap();
        this.operatorLastEndWindowTimestamps = Maps.newConcurrentMap();
        this.lastStatsTimestamp = System.currentTimeMillis();
        this.rpcLatencies = new ConcurrentHashMap<>();
        this.nodeToStramRequestIds = new AtomicLong(1L);
        this.allocatedMemoryMB = 0;
        this.appDataSources = null;
        this.commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.MINUTES).build();
        this.logicalMetrics = Maps.newConcurrentMap();
        this.latestLogicalMetrics = Maps.newHashMap();
        this.latestLogicalCounters = Maps.newHashMap();
        this.apexPluginDispatcher = new NoOpApexPluginDispatcher();
        this.completedContainers = new LinkedHashMap<String, ContainerInfo>() { // from class: com.datatorrent.stram.StreamingContainerManager.1
            private static final long serialVersionUID = 201405281500L;

            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<String, ContainerInfo> entry) {
                long currentTimeMillis = System.currentTimeMillis() - 108000;
                Iterator<Map.Entry<String, ContainerInfo>> it = entrySet().iterator();
                while (it.hasNext()) {
                    if (it.next().getValue().finishedTime < currentTimeMillis) {
                        it.remove();
                    }
                }
                return false;
            }
        };
        this.startTime = System.currentTimeMillis();
        this.appDataSourcesLock = new Object();
        this.startedFromCheckpoint = false;
        this.clock = clock;
        this.vars = new FinalVars(logicalPlan, clock.getTime());
        this.poolExecutor = Executors.newFixedThreadPool(4);
        if (z) {
            this.eventBus = new MBassador<>(BusConfiguration.Default(1, 1, 1));
        }
        this.plan = new PhysicalPlan(logicalPlan, this);
        this.journal = new Journal(this);
        init(z);
    }

    private StreamingContainerManager(CheckpointState checkpointState, boolean z) {
        this.containerStopRequests = new ConcurrentHashMap();
        this.containerStartRequests = new ConcurrentLinkedQueue<>();
        this.forcedShutdown = false;
        this.eventQueue = new ConcurrentLinkedQueue<>();
        this.eventQueueProcessing = new AtomicBoolean();
        this.pendingAllocation = Sets.newLinkedHashSet();
        this.shutdownDiagnosticsMessage = "";
        this.lastResourceRequest = 0L;
        this.containers = new ConcurrentHashMap();
        this.purgeCheckpoints = new ArrayList();
        this.shutdownOperators = new HashMap();
        this.reportStats = new ConcurrentHashMap();
        this.deployChangeInProgress = new AtomicBoolean();
        this.endWindowStatsOperatorMap = new ConcurrentSkipListMap<>();
        this.slowestUpstreamOp = new ConcurrentHashMap();
        this.lastCommittedWindowId = Checkpoint.INITIAL_CHECKPOINT.getWindowId();
        this.operatorPortLastEndWindowTimestamps = Maps.newConcurrentMap();
        this.operatorLastEndWindowTimestamps = Maps.newConcurrentMap();
        this.lastStatsTimestamp = System.currentTimeMillis();
        this.rpcLatencies = new ConcurrentHashMap<>();
        this.nodeToStramRequestIds = new AtomicLong(1L);
        this.allocatedMemoryMB = 0;
        this.appDataSources = null;
        this.commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.MINUTES).build();
        this.logicalMetrics = Maps.newConcurrentMap();
        this.latestLogicalMetrics = Maps.newHashMap();
        this.latestLogicalCounters = Maps.newHashMap();
        this.apexPluginDispatcher = new NoOpApexPluginDispatcher();
        this.completedContainers = new LinkedHashMap<String, ContainerInfo>() { // from class: com.datatorrent.stram.StreamingContainerManager.1
            private static final long serialVersionUID = 201405281500L;

            @Override // java.util.LinkedHashMap
            protected boolean removeEldestEntry(Map.Entry<String, ContainerInfo> entry) {
                long currentTimeMillis = System.currentTimeMillis() - 108000;
                Iterator<Map.Entry<String, ContainerInfo>> it = entrySet().iterator();
                while (it.hasNext()) {
                    if (it.next().getValue().finishedTime < currentTimeMillis) {
                        it.remove();
                    }
                }
                return false;
            }
        };
        this.startTime = System.currentTimeMillis();
        this.appDataSourcesLock = new Object();
        this.startedFromCheckpoint = false;
        this.vars = checkpointState.finals;
        this.clock = new SystemClock();
        this.poolExecutor = Executors.newFixedThreadPool(4);
        this.plan = checkpointState.physicalPlan;
        this.eventBus = new MBassador<>(BusConfiguration.Default(1, 1, 1));
        this.journal = new Journal(this);
        init(z);
    }

    private void init(boolean z) {
        setupWsClient();
        setupRecording(z);
        setupStringCodecs();
        try {
            URI uri = new Path(this.vars.appPath).toUri();
            YarnConfiguration yarnConfiguration = new YarnConfiguration();
            this.fileContext = uri.getScheme() == null ? FileContext.getFileContext(yarnConfiguration) : FileContext.getFileContext(uri, yarnConfiguration);
            saveMetaInfo();
            this.containerFile = new FSJsonLineFile(this.fileContext, new Path(this.vars.appPath, String.format(CONTAINERS_INFO_FILENAME_FORMAT, this.plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID))), FsPermission.getDefault());
            this.containerFile.append(getAppMasterContainerInfo());
            this.operatorFile = new FSJsonLineFile(this.fileContext, new Path(this.vars.appPath, String.format(OPERATORS_INFO_FILENAME_FORMAT, this.plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ATTEMPT_ID))), FsPermission.getDefault());
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    public Journal getJournal() {
        return this.journal;
    }

    public final ContainerInfo getAppMasterContainerInfo() {
        ContainerInfo containerInfo = new ContainerInfo();
        containerInfo.id = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString());
        String str = System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
        String str2 = System.getenv(ApplicationConstants.Environment.NM_PORT.toString());
        String str3 = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString());
        containerInfo.state = "ACTIVE";
        containerInfo.jvmName = ManagementFactory.getRuntimeMXBean().getName();
        containerInfo.numOperators = 0;
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        if (str != null) {
            if (str2 != null) {
                containerInfo.host = str + ":" + str2;
            }
            if (str3 != null) {
                String str4 = str + ":" + str3;
                if (this.allocatedMemoryMB == 0) {
                    String str5 = ConfigUtils.getSchemePrefix(yarnConfiguration) + str4 + "/ws/v1/node/containers/" + containerInfo.id;
                    try {
                        int i = new JSONObject((String) new WebServicesClient().process(str5, String.class, new WebServicesClient.GetWebServicesHandler())).getJSONObject("container").getInt("totalMemoryNeededMB");
                        if (i > 0) {
                            this.allocatedMemoryMB = i;
                        } else {
                            LOG.warn("Could not determine the memory allocated for the streaming application master.  Node manager is reporting {} MB from {}", Integer.valueOf(i), str5);
                        }
                    } catch (Exception e) {
                        LOG.warn("Could not determine the memory allocated for the streaming application master", e);
                    }
                }
                containerInfo.containerLogsUrl = ConfigUtils.getSchemePrefix(yarnConfiguration) + str4 + "/node/containerlogs/" + containerInfo.id + "/" + System.getenv(ApplicationConstants.Environment.USER.toString());
                containerInfo.rawContainerLogsUrl = ConfigUtils.getRawContainerLogsUrl(yarnConfiguration, str4, (String) this.plan.getLogicalPlan().getAttributes().get(LogicalPlan.APPLICATION_ID), containerInfo.id);
            }
        }
        containerInfo.memoryMBAllocated = this.allocatedMemoryMB;
        containerInfo.memoryMBFree = (int) (Runtime.getRuntime().freeMemory() / 1048576);
        containerInfo.lastHeartbeat = -1L;
        containerInfo.startedTime = this.startTime;
        containerInfo.finishedTime = -1L;
        return containerInfo;
    }

    public void updateRPCLatency(String str, long j) {
        if (this.vars.rpcLatencyCompensationSamples > 0) {
            MovingAverage.MovingAverageLong movingAverageLong = this.rpcLatencies.get(str);
            if (movingAverageLong == null) {
                MovingAverage.MovingAverageLong movingAverageLong2 = new MovingAverage.MovingAverageLong(this.vars.rpcLatencyCompensationSamples);
                movingAverageLong = this.rpcLatencies.putIfAbsent(str, movingAverageLong2);
                if (movingAverageLong == null) {
                    movingAverageLong = movingAverageLong2;
                }
            }
            movingAverageLong.add(j);
        }
    }

    private void setupRecording(boolean z) {
        if (this.vars.enableStatsRecording) {
            this.statsRecorder = new FSStatsRecorder();
            this.statsRecorder.setBasePath(this.vars.appPath + "/" + LogicalPlan.SUBDIR_STATS);
            this.statsRecorder.setup();
        }
        if (z) {
            this.eventRecorder = new FSEventRecorder((String) this.plan.getLogicalPlan().getValue(LogicalPlan.APPLICATION_ID));
            this.eventRecorder.setBasePath(this.vars.appPath + "/" + LogicalPlan.SUBDIR_EVENTS);
            this.eventRecorder.setWebSocketClient(this.wsClient);
            this.eventRecorder.setup();
            this.eventBus.subscribe(this.eventRecorder);
        }
    }

    private void setupStringCodecs() {
        StringCodecs.loadConverters((Map) this.plan.getLogicalPlan().getAttributes().get(Context.DAGContext.STRING_CODECS));
    }

    private void setupWsClient() {
        String str = (String) this.plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS);
        boolean booleanValue = ((Boolean) this.plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USE_SSL)).booleanValue();
        String str2 = (String) this.plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USER_NAME);
        String str3 = (String) this.plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_PASSWORD);
        int intValue = ((Integer) this.plan.getLogicalPlan().getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS)).intValue();
        if (str != null) {
            try {
                this.wsClient = new SharedPubSubWebSocketClient((booleanValue ? "wss://" : "ws://") + str + "/pubsub", intValue);
                if (str2 != null && str3 != null) {
                    this.wsClient.setLoginUrl((booleanValue ? "https://" : "http://") + str + GATEWAY_LOGIN_URL_PATH);
                    this.wsClient.setUserName(str2);
                    this.wsClient.setPassword(str3);
                }
                this.wsClient.setup();
            } catch (Exception e) {
                LOG.warn("Cannot establish websocket connection to {}", str, e);
            }
        }
    }

    public void teardown() {
        if (this.eventBus != null) {
            this.eventBus.shutdown();
        }
        if (this.eventRecorder != null) {
            this.eventRecorder.teardown();
        }
        if (this.statsRecorder != null) {
            this.statsRecorder.teardown();
        }
        IOUtils.closeQuietly(this.containerFile);
        IOUtils.closeQuietly(this.operatorFile);
        if (this.poolExecutor != null) {
            this.poolExecutor.shutdown();
        }
    }

    public void subscribeToEvents(Object obj) {
        if (this.eventBus != null) {
            this.eventBus.subscribe(obj);
        }
    }

    public PhysicalPlan getPhysicalPlan() {
        return this.plan;
    }

    public long getCommittedWindowId() {
        return this.committedWindowId;
    }

    public boolean isGatewayConnected() {
        return this.wsClient != null && this.wsClient.isConnectionOpen();
    }

    public SharedPubSubWebSocketClient getWsClient() {
        return this.wsClient;
    }

    private String convertAppDataUrl(String str) {
        if (BUILTIN_APPDATA_URL.equals(str)) {
            return str;
        }
        LOG.warn("App Data URL {} cannot be converted for the client.", str);
        return str;
    }

    public List<AppDataSource> getAppDataSources() {
        AppData.EmbeddableQueryInfoProvider embeddableQueryInfoProvider;
        synchronized (this.appDataSourcesLock) {
            if (this.appDataSources == null) {
                this.appDataSources = new ArrayList();
                for (LogicalPlan.OperatorMeta operatorMeta : this.plan.getLogicalPlan().getAllOperators()) {
                    Map<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> inputStreams = operatorMeta.getInputStreams();
                    Map<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> outputStreams = operatorMeta.getOutputStreams();
                    String str = null;
                    String str2 = null;
                    String str3 = null;
                    boolean z = false;
                    if ((operatorMeta.getOperator() instanceof AppData.Store) && (embeddableQueryInfoProvider = operatorMeta.getOperator().getEmbeddableQueryInfoProvider()) != null) {
                        z = true;
                        str = operatorMeta.getName() + EMBEDDABLE_QUERY_NAME_SUFFIX;
                        str2 = embeddableQueryInfoProvider.getAppDataURL();
                        str3 = embeddableQueryInfoProvider.getTopic();
                    }
                    LOG.debug("Looking at operator {} {}", operatorMeta.getName(), Long.valueOf(Thread.currentThread().getId()));
                    Iterator<Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta>> it = inputStreams.entrySet().iterator();
                    while (true) {
                        if (it.hasNext()) {
                            Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> next = it.next();
                            if (next.getKey().isAppDataQueryPort()) {
                                if (str2 != null) {
                                    LOG.warn("Multiple query ports found in operator {}. Ignoring the App Data Source.", operatorMeta.getName());
                                    break;
                                }
                                LogicalPlan.OperatorMeta m100getOperatorMeta = next.getValue().m102getSource().m100getOperatorMeta();
                                if (m100getOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
                                    if (z) {
                                        LOG.warn("An embeddable query connector and the {} query operator were discovered. The query operator will be ignored and the embeddable query connector will be used instead.", operatorMeta.getName());
                                    } else {
                                        AppData.ConnectionInfoProvider operator = m100getOperatorMeta.getOperator();
                                        str = m100getOperatorMeta.getName();
                                        str2 = operator.getAppDataURL();
                                        str3 = operator.getTopic();
                                    }
                                }
                            }
                        } else {
                            for (Map.Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : outputStreams.entrySet()) {
                                LogicalPlan.OutputPortMeta key = entry.getKey();
                                LOG.debug("Looking at port {} {}", key.getPortName(), Long.valueOf(Thread.currentThread().getId()));
                                if (key.isAppDataResultPort()) {
                                    AppDataSource appDataSource = new AppDataSource();
                                    appDataSource.setType(AppDataSource.Type.DAG);
                                    appDataSource.setOperatorName(operatorMeta.getName());
                                    appDataSource.setPortName(key.getPortName());
                                    if (str == null) {
                                        LOG.warn("There is no query operator for the App Data Source {}.{}. Ignoring the App Data Source.", operatorMeta.getName(), key.getPortName());
                                    } else {
                                        appDataSource.setQueryOperatorName(str);
                                        appDataSource.setQueryTopic(str3);
                                        appDataSource.setQueryUrl(convertAppDataUrl(str2));
                                        Collection<LogicalPlan.InputPortMeta> sinks = entry.getValue().getSinks();
                                        if (sinks.isEmpty()) {
                                            LOG.warn("There is no result operator for the App Data Source {}.{}. Ignoring the App Data Source.", operatorMeta.getName(), key.getPortName());
                                        } else if (sinks.size() > 1) {
                                            LOG.warn("There are multiple result operators for the App Data Source {}.{}. Ignoring the App Data Source.", operatorMeta.getName(), key.getPortName());
                                        } else {
                                            LogicalPlan.OperatorMeta m97getOperatorMeta = sinks.iterator().next().m97getOperatorMeta();
                                            if (m97getOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
                                                AppData.ConnectionInfoProvider operator2 = m97getOperatorMeta.getOperator();
                                                appDataSource.setResultOperatorName(m97getOperatorMeta.getName());
                                                appDataSource.setResultTopic(operator2.getTopic());
                                                appDataSource.setResultUrl(convertAppDataUrl(operator2.getAppDataURL()));
                                                AppData.AppendQueryIdToTopic annotation = operator2.getClass().getAnnotation(AppData.AppendQueryIdToTopic.class);
                                                if (annotation != null && annotation.value()) {
                                                    appDataSource.setResultAppendQIDTopic(true);
                                                }
                                                LOG.debug("Adding appDataSource {} {}", appDataSource.getName(), Long.valueOf(Thread.currentThread().getId()));
                                                this.appDataSources.add(appDataSource);
                                            } else {
                                                LOG.warn("Result operator for the App Data Source {}.{} does not implement the right interface. Ignoring the App Data Source.", operatorMeta.getName(), key.getPortName());
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        return this.appDataSources;
    }

    public Map<String, Map<String, Object>> getLatestLogicalMetrics() {
        return this.latestLogicalMetrics;
    }

    public void monitorHeartbeat(boolean z) {
        long time = this.clock.getTime();
        if (!this.pendingAllocation.isEmpty()) {
            if (this.lastResourceRequest + ((Integer) this.plan.getLogicalPlan().getValue(LogicalPlan.RESOURCE_ALLOCATION_TIMEOUT_MILLIS)).intValue() < time) {
                String format = String.format("Shutdown due to resource allocation timeout (%s ms) waiting for %s containers", Long.valueOf(time - this.lastResourceRequest), Integer.valueOf(this.pendingAllocation.size()));
                LOG.warn(format);
                Iterator<PTContainer> it = this.pendingAllocation.iterator();
                while (it.hasNext()) {
                    PTContainer next = it.next();
                    LOG.warn("Waiting for resource: {}m priority: {} {}", new Object[]{Integer.valueOf(next.getRequiredMemoryMB()), Integer.valueOf(next.getResourceRequestPriority()), next});
                }
                shutdownAllContainers(StreamingContainerUmbilicalProtocol.ShutdownType.ABORT, format);
                this.forcedShutdown = true;
            } else {
                Iterator<PTContainer> it2 = this.pendingAllocation.iterator();
                while (it2.hasNext()) {
                    PTContainer next2 = it2.next();
                    LOG.debug("Waiting for resource: {}m {}", Integer.valueOf(next2.getRequiredMemoryMB()), next2);
                }
            }
        }
        for (StreamingContainerAgent streamingContainerAgent : this.containers.values()) {
            PTContainer pTContainer = streamingContainerAgent.container;
            if (!this.pendingAllocation.contains(pTContainer) && pTContainer.getExternalId() != null) {
                if (streamingContainerAgent.lastHeartbeatMillis == 0) {
                    if (time - streamingContainerAgent.createdMillis > 2 * this.vars.heartbeatTimeoutMillis) {
                        LOG.warn("Container {}@{} startup timeout ({} ms).", new Object[]{pTContainer.getExternalId(), pTContainer.host, Long.valueOf(time - streamingContainerAgent.createdMillis)});
                        this.containerStopRequests.put(pTContainer.getExternalId(), pTContainer.getExternalId());
                    }
                } else if (time - streamingContainerAgent.lastHeartbeatMillis > this.vars.heartbeatTimeoutMillis && !isApplicationIdle()) {
                    if (streamingContainerAgent.lastHeartbeatMillis != -1) {
                        String format2 = String.format("Container %s@%s heartbeat timeout  (%d%n ms).", pTContainer.getExternalId(), pTContainer.host, Long.valueOf(time - streamingContainerAgent.lastHeartbeatMillis));
                        LOG.warn(format2);
                        StramEvent.ContainerErrorEvent containerErrorEvent = new StramEvent.ContainerErrorEvent(pTContainer.getExternalId(), format2, null);
                        containerErrorEvent.setReason(format2);
                        recordEventAsync(containerErrorEvent);
                        streamingContainerAgent.lastHeartbeatMillis = -1L;
                    }
                    this.containerStopRequests.put(pTContainer.getExternalId(), pTContainer.getExternalId());
                }
            }
        }
        processEvents();
        this.committedWindowId = updateCheckpoints(z);
        if (this.lastCommittedWindowId != this.committedWindowId) {
            this.apexPluginDispatcher.dispatch(new DAGExecutionEvent.CommitExecutionEvent(this.committedWindowId));
            this.lastCommittedWindowId = this.committedWindowId;
        }
        calculateEndWindowStats();
        if (this.vars.enableStatsRecording) {
            recordStats(time);
        }
    }

    private void recordStats(long j) {
        try {
            this.statsRecorder.recordContainers(this.containers, j);
            this.statsRecorder.recordOperators(getOperatorInfoList(), j);
        } catch (Exception e) {
            LOG.warn("Exception caught when recording stats", e);
        }
    }

    private void calculateEndWindowStats() {
        Map<Integer, PTOperator> allOperators = this.plan.getAllOperators();
        UpdateOperatorLatencyContext updateOperatorLatencyContext = new UpdateOperatorLatencyContext(this.rpcLatencies, this.endWindowStatsOperatorMap);
        Iterator<PTOperator> it = allOperators.values().iterator();
        while (it.hasNext()) {
            updateOperatorLatency(it.next(), updateOperatorLatencyContext);
        }
        if (this.endWindowStatsOperatorMap.isEmpty()) {
            return;
        }
        if (this.endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
            LOG.warn("Some operators are behind for more than {} windows! Trimming the end window stats map", Integer.valueOf(this.vars.maxWindowsBehindForStats));
            while (this.endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
                LOG.debug("Removing incomplete end window stats for window id {}. Collected operator set: {}. Complete set: {}", new Object[]{this.endWindowStatsOperatorMap.firstKey(), this.endWindowStatsOperatorMap.get(this.endWindowStatsOperatorMap.firstKey()).keySet(), allOperators.keySet()});
                this.endWindowStatsOperatorMap.remove(this.endWindowStatsOperatorMap.firstKey());
            }
        }
        int size = allOperators.size();
        Long firstKey = this.endWindowStatsOperatorMap.firstKey();
        while (true) {
            Long l = firstKey;
            if (l == null) {
                return;
            }
            Map<Integer, EndWindowStats> map = this.endWindowStatsOperatorMap.get(l);
            Set<Integer> keySet = map.keySet();
            aggregateMetrics(l.longValue(), map);
            this.criticalPathInfo = findCriticalPath();
            if (!allOperators.keySet().containsAll(keySet)) {
                LOG.debug("Stats for non-existent operators detected. Disregarding end window stats for window {}", l);
                this.endWindowStatsOperatorMap.remove(l);
            } else if (map.size() >= size) {
                this.endWindowStatsOperatorMap.remove(l);
                this.currentEndWindowStatsWindowId = l.longValue();
            } else {
                if (l.longValue() >= this.completeEndWindowStatsWindowId) {
                    return;
                }
                LOG.debug("Disregarding stale end window stats for window {}", l);
                this.endWindowStatsOperatorMap.remove(l);
            }
            firstKey = this.endWindowStatsOperatorMap.higherKey(l);
        }
    }

    private void aggregateMetrics(long j, Map<Integer, EndWindowStats> map) {
        Map<String, Object> aggregate;
        Collection<LogicalPlan.OperatorMeta> allOperators = getLogicalPlan().getAllOperators();
        for (LogicalPlan.OperatorMeta operatorMeta : allOperators) {
            Context.CountersAggregator countersAggregator = (Context.CountersAggregator) operatorMeta.getValue(Context.OperatorContext.COUNTERS_AGGREGATOR);
            if (countersAggregator != null) {
                Collection<PTOperator> allOperators2 = this.plan.getAllOperators(operatorMeta);
                ArrayList newArrayList = Lists.newArrayList();
                Iterator<PTOperator> it = allOperators2.iterator();
                while (it.hasNext()) {
                    EndWindowStats endWindowStats = map.get(Integer.valueOf(it.next().getId()));
                    if (endWindowStats != null && endWindowStats.counters != null) {
                        newArrayList.add(endWindowStats.counters);
                    }
                }
                if (newArrayList.size() > 0) {
                    this.latestLogicalCounters.put(operatorMeta.getName(), countersAggregator.aggregate(newArrayList));
                }
            }
        }
        for (LogicalPlan.OperatorMeta operatorMeta2 : allOperators) {
            AutoMetric.Aggregator aggregator = operatorMeta2.getMetricAggregatorMeta() != null ? operatorMeta2.getMetricAggregatorMeta().getAggregator() : null;
            if (aggregator != null) {
                Collection<PTOperator> allOperators3 = this.plan.getAllOperators(operatorMeta2);
                ArrayList newArrayList2 = Lists.newArrayList();
                for (PTOperator pTOperator : allOperators3) {
                    EndWindowStats endWindowStats2 = map.get(Integer.valueOf(pTOperator.getId()));
                    if (endWindowStats2 != null && endWindowStats2.metrics != null) {
                        newArrayList2.add(new PhysicalMetricsContextImpl(pTOperator.getId(), endWindowStats2.metrics));
                    }
                }
                if (!newArrayList2.isEmpty() && (aggregate = aggregator.aggregate(j, newArrayList2)) != null && aggregate.size() > 0) {
                    Queue<Pair<Long, Map<String, Object>>> queue = this.logicalMetrics.get(operatorMeta2.getName());
                    if (queue == null) {
                        queue = new LinkedBlockingQueue<Pair<Long, Map<String, Object>>>(METRIC_QUEUE_SIZE) { // from class: com.datatorrent.stram.StreamingContainerManager.2
                            private static final long serialVersionUID = 1;

                            @Override // java.util.AbstractQueue, java.util.AbstractCollection, java.util.Collection, java.util.Queue, java.util.concurrent.BlockingQueue
                            public boolean add(Pair<Long, Map<String, Object>> pair) {
                                if (remainingCapacity() <= 1) {
                                    remove();
                                }
                                return super.add((AnonymousClass2) pair);
                            }
                        };
                        this.logicalMetrics.put(operatorMeta2.getName(), queue);
                    }
                    LOG.debug("Adding to logical metrics for {}", operatorMeta2.getName());
                    queue.add(new Pair<>(Long.valueOf(j), aggregate));
                    if (this.latestLogicalMetrics.put(operatorMeta2.getName(), aggregate) == null) {
                        try {
                            saveMetaInfo();
                        } catch (IOException e) {
                            LOG.error("Cannot save application meta info to DFS. App data sources will not be available.", e);
                        }
                    }
                }
            }
        }
    }

    private void saveMetaInfo() throws IOException {
        Path path = new Path(this.vars.appPath, "meta.json." + System.nanoTime());
        try {
            FSDataOutputStream create = this.fileContext.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), new Options.CreateOpts[]{Options.CreateOpts.CreateParent.createParent()});
            Throwable th = null;
            try {
                JSONObject jSONObject = new JSONObject();
                JSONObject jSONObject2 = new JSONObject();
                for (Map.Entry entry : this.plan.getLogicalPlan().getAttributes().entrySet()) {
                    jSONObject2.put(((Attribute) entry.getKey()).getSimpleName(), entry.getValue());
                }
                JSONObject jSONObject3 = new JSONObject();
                for (Map.Entry<String, Map<String, Object>> entry2 : this.latestLogicalMetrics.entrySet()) {
                    jSONObject3.put(entry2.getKey(), new JSONArray(entry2.getValue().keySet()));
                }
                jSONObject.put(APP_META_KEY_ATTRIBUTES, jSONObject2);
                jSONObject.put(APP_META_KEY_METRICS, jSONObject3);
                create.write(jSONObject.toString().getBytes());
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                this.fileContext.rename(path, new Path(this.vars.appPath, APP_META_FILENAME), new Options.Rename[]{Options.Rename.OVERWRITE});
            } finally {
            }
        } catch (JSONException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String str) {
        return this.logicalMetrics.get(str);
    }

    private CriticalPathInfo findCriticalPath() {
        CriticalPathInfo criticalPathInfo = null;
        List<PTOperator> leafOperators = this.plan.getLeafOperators();
        HashMap hashMap = new HashMap();
        Iterator<PTOperator> it = leafOperators.iterator();
        while (it.hasNext()) {
            CriticalPathInfo findCriticalPathHelper = findCriticalPathHelper(it.next(), hashMap);
            if (criticalPathInfo == null || criticalPathInfo.latency < findCriticalPathHelper.latency) {
                criticalPathInfo = findCriticalPathHelper;
            }
        }
        return criticalPathInfo;
    }

    private CriticalPathInfo findCriticalPathHelper(PTOperator pTOperator, Map<PTOperator, CriticalPathInfo> map) {
        CriticalPathInfo criticalPathInfo;
        CriticalPathInfo criticalPathInfo2 = map.get(pTOperator);
        if (criticalPathInfo2 != null) {
            return criticalPathInfo2;
        }
        PTOperator pTOperator2 = this.slowestUpstreamOp.get(pTOperator);
        if (pTOperator2 != null) {
            try {
                criticalPathInfo = (CriticalPathInfo) findCriticalPathHelper(pTOperator2, map).clone();
            } catch (CloneNotSupportedException e) {
                throw new RuntimeException();
            }
        } else {
            criticalPathInfo = new CriticalPathInfo();
        }
        criticalPathInfo.latency += pTOperator.stats.getLatencyMA();
        criticalPathInfo.path.addLast(Integer.valueOf(pTOperator.getId()));
        map.put(pTOperator, criticalPathInfo);
        return criticalPathInfo;
    }

    public int processEvents() {
        for (PTOperator pTOperator : this.reportStats.keySet()) {
            List<Stats.OperatorStats> poll = pTOperator.stats.listenerStats.poll();
            if (poll != null) {
                while (true) {
                    List<Stats.OperatorStats> poll2 = pTOperator.stats.listenerStats.poll();
                    if (poll2 == null) {
                        break;
                    }
                    poll.addAll(poll2);
                }
            }
            pTOperator.stats.lastWindowedStats = poll;
            pTOperator.stats.operatorResponses = null;
            if (!pTOperator.stats.responses.isEmpty()) {
                pTOperator.stats.operatorResponses = new ArrayList();
                while (true) {
                    StatsListener.OperatorResponse poll3 = pTOperator.stats.responses.poll();
                    if (poll3 == null) {
                        break;
                    }
                    pTOperator.stats.operatorResponses.add(poll3);
                }
            }
            if (pTOperator.stats.lastWindowedStats != null && pTOperator.statsListeners != null) {
                this.plan.onStatusUpdate(pTOperator);
            }
            this.reportStats.remove(pTOperator);
        }
        if (!this.shutdownOperators.isEmpty()) {
            synchronized (this.shutdownOperators) {
                Iterator<Map.Entry<Long, Set<PTOperator>>> it = this.shutdownOperators.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Long, Set<PTOperator>> next = it.next();
                    if (next.getKey().longValue() <= this.committedWindowId || checkDownStreamOperators(next)) {
                        LOG.info("Removing inactive operators at window {} {}", Codec.getStringWindowId(next.getKey().longValue()), next.getValue());
                        Iterator<PTOperator> it2 = next.getValue().iterator();
                        while (it2.hasNext()) {
                            this.plan.removeTerminatedPartition(it2.next());
                        }
                        it.remove();
                    }
                }
            }
        }
        if (!this.eventQueue.isEmpty()) {
            for (PTOperator pTOperator2 : this.plan.getAllOperators().values()) {
                if (pTOperator2.getState() != PTOperator.State.ACTIVE) {
                    LOG.debug("Skipping plan updates due to inactive operator {} {}", pTOperator2, pTOperator2.getState());
                    return 0;
                }
            }
        }
        int i = 0;
        while (true) {
            Runnable poll4 = this.eventQueue.poll();
            if (poll4 == null) {
                break;
            }
            this.eventQueueProcessing.set(true);
            try {
                poll4.run();
                i++;
            } catch (Exception e) {
                LOG.error("Failed to execute {}", poll4, e);
            }
            this.eventQueueProcessing.set(false);
        }
        if (i > 0) {
            try {
                checkpoint();
            } catch (Exception e2) {
                throw new RuntimeException("Failed to checkpoint state.", e2);
            }
        }
        return i;
    }

    private boolean checkDownStreamOperators(Map.Entry<Long, Set<PTOperator>> entry) {
        Iterator<PTOperator> it = getPhysicalPlan().getDependents(entry.getValue()).iterator();
        while (it.hasNext()) {
            if (it.next().stats.currentWindowId.get() < entry.getKey().longValue()) {
                return false;
            }
        }
        return true;
    }

    public void scheduleContainerRestart(String str) {
        StreamingContainerAgent containerAgent = getContainerAgent(str);
        if (containerAgent == null || containerAgent.isShutdownRequested()) {
            return;
        }
        LOG.info("Initiating recovery for {}@{}", str, containerAgent.container.host);
        containerAgent.container.setState(PTContainer.State.KILLED);
        containerAgent.container.bufferServerAddress = null;
        containerAgent.container.setResourceRequestPriority(-1);
        containerAgent.container.setAllocatedMemoryMB(0);
        containerAgent.container.setAllocatedVCores(0);
        UpdateCheckpointsContext updateCheckpointsContext = new UpdateCheckpointsContext(this.clock, false, getCheckpointGroups());
        Iterator<PTOperator> it = containerAgent.container.getOperators().iterator();
        while (it.hasNext()) {
            updateRecoveryCheckpoints(it.next(), updateCheckpointsContext, false);
        }
        includeLocalUpstreamOperators(updateCheckpointsContext);
        LOG.info("Affected operators {}", updateCheckpointsContext.visited);
        deploy(Collections.emptySet(), updateCheckpointsContext.visited, Sets.newHashSet(new PTContainer[]{containerAgent.container}), updateCheckpointsContext.visited);
    }

    private void includeLocalUpstreamOperators(UpdateCheckpointsContext updateCheckpointsContext) {
        HashSet newHashSet = Sets.newHashSet();
        do {
            newHashSet.clear();
            for (PTOperator pTOperator : updateCheckpointsContext.visited) {
                for (PTOperator.PTInput pTInput : pTOperator.getInputs()) {
                    if (pTInput.source.source.getContainer() == pTOperator.getContainer() && !updateCheckpointsContext.visited.contains(pTInput.source.source)) {
                        newHashSet.add(pTInput.source.source);
                    }
                }
            }
            if (!newHashSet.isEmpty()) {
                Iterator it = newHashSet.iterator();
                while (it.hasNext()) {
                    updateRecoveryCheckpoints((PTOperator) it.next(), updateCheckpointsContext, false);
                }
            }
        } while (!newHashSet.isEmpty());
    }

    public void removeContainerAgent(String str) {
        LOG.debug("Removing container agent {}", str);
        StreamingContainerAgent remove = this.containers.remove(str);
        if (remove != null) {
            for (PTOperator pTOperator : remove.container.getOperators()) {
                recordEventAsync(new StramEvent.StopOperatorEvent(pTOperator.getName(), pTOperator.getId(), str));
            }
            remove.container.setFinishedTime(System.currentTimeMillis());
            remove.container.setState(PTContainer.State.KILLED);
            this.completedContainers.put(str, remove.getContainerInfo());
        }
    }

    public Collection<ContainerInfo> getCompletedContainerInfo() {
        return Collections.unmodifiableCollection(this.completedContainers.values());
    }

    public StreamingContainerAgent assignContainer(ContainerResource containerResource, InetSocketAddress inetSocketAddress) {
        PTContainer pTContainer = null;
        Iterator<PTContainer> it = this.pendingAllocation.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            PTContainer next = it.next();
            if (next.getState() == PTContainer.State.NEW || next.getState() == PTContainer.State.KILLED) {
                if (next.getResourceRequestPriority() == containerResource.priority) {
                    pTContainer = next;
                    break;
                }
            }
        }
        if (pTContainer == null) {
            LOG.debug("No container matching allocated resource {}", containerResource);
            LOG.debug("Containers waiting for allocation {}", this.pendingAllocation);
            return null;
        }
        this.pendingAllocation.remove(pTContainer);
        pTContainer.setState(PTContainer.State.ALLOCATED);
        if (pTContainer.getExternalId() != null) {
            LOG.info("Removing container agent {}", pTContainer.getExternalId());
            this.containers.remove(pTContainer.getExternalId());
        }
        pTContainer.setExternalId(containerResource.containerId);
        pTContainer.host = containerResource.host;
        pTContainer.bufferServerAddress = inetSocketAddress;
        if (UserGroupInformation.isSecurityEnabled()) {
            pTContainer.setBufferServerToken(AuthManager.generateToken());
        }
        pTContainer.nodeHttpAddress = containerResource.nodeHttpAddress;
        pTContainer.setAllocatedMemoryMB(containerResource.memoryMB);
        pTContainer.setAllocatedVCores(containerResource.vCores);
        pTContainer.setStartedTime(-1L);
        pTContainer.setFinishedTime(-1L);
        writeJournal(pTContainer.getSetContainerState());
        StreamingContainerAgent streamingContainerAgent = new StreamingContainerAgent(pTContainer, newStreamingContainerContext(pTContainer), this);
        this.containers.put(containerResource.containerId, streamingContainerAgent);
        LOG.debug("Assigned container {} priority {}", containerResource.containerId, Integer.valueOf(containerResource.priority));
        return streamingContainerAgent;
    }

    private StreamingContainerUmbilicalProtocol.StreamingContainerContext newStreamingContainerContext(PTContainer pTContainer) {
        try {
            int i = 0;
            Iterator<PTOperator> it = pTContainer.getOperators().iterator();
            while (it.hasNext()) {
                i += it.next().getBufferServerMemory();
            }
            LOG.debug("Buffer Server Memory {}", Integer.valueOf(i));
            StreamingContainerUmbilicalProtocol.StreamingContainerContext streamingContainerContext = new StreamingContainerUmbilicalProtocol.StreamingContainerContext(this.plan.getLogicalPlan().getAttributes().clone(), null);
            streamingContainerContext.attributes.put(ContainerContext.IDENTIFIER, pTContainer.getExternalId());
            streamingContainerContext.attributes.put(ContainerContext.BUFFER_SERVER_MB, Integer.valueOf(i));
            streamingContainerContext.attributes.put(ContainerContext.BUFFER_SERVER_TOKEN, pTContainer.getBufferServerToken());
            streamingContainerContext.startWindowMillis = this.vars.windowStartMillis;
            return streamingContainerContext;
        } catch (CloneNotSupportedException e) {
            throw new RuntimeException("Cannot clone DAG attributes", e);
        }
    }

    public StreamingContainerAgent getContainerAgent(String str) {
        StreamingContainerAgent streamingContainerAgent = this.containers.get(str);
        if (streamingContainerAgent == null) {
            LOG.warn("Trying to get unknown container {}", str);
        }
        return streamingContainerAgent;
    }

    public Collection<StreamingContainerAgent> getContainerAgents() {
        return this.containers.values();
    }

    private void processOperatorDeployStatus(PTOperator pTOperator, StreamingContainerUmbilicalProtocol.OperatorHeartbeat operatorHeartbeat, StreamingContainerAgent streamingContainerAgent) {
        StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState deployState = null;
        if (operatorHeartbeat != null) {
            deployState = operatorHeartbeat.getState();
        }
        LOG.debug("heartbeat {} {}/{} {}", new Object[]{pTOperator, pTOperator.getState(), deployState, pTOperator.getContainer().getExternalId()});
        switch (AnonymousClass5.$SwitchMap$com$datatorrent$stram$plan$physical$PTOperator$State[pTOperator.getState().ordinal()]) {
            case MethodSignatureVisitor.VISIT_PARAM /* 1 */:
                if (deployState == null) {
                    streamingContainerAgent.deployOpers.add(pTOperator);
                    return;
                }
                switch (AnonymousClass5.$SwitchMap$com$datatorrent$stram$api$StreamingContainerUmbilicalProtocol$OperatorHeartbeat$DeployState[deployState.ordinal()]) {
                    case MethodSignatureVisitor.VISIT_PARAM /* 1 */:
                        long j = pTOperator.stats.currentWindowId.get();
                        if (operatorHeartbeat.windowStats != null && !operatorHeartbeat.windowStats.isEmpty()) {
                            j = operatorHeartbeat.windowStats.get(operatorHeartbeat.windowStats.size() - 1).windowId;
                        }
                        LOG.debug("Operator {} deactivated at window {}", pTOperator, Long.valueOf(j));
                        synchronized (this.shutdownOperators) {
                            Set<PTOperator> set = this.shutdownOperators.get(Long.valueOf(j));
                            if (set == null) {
                                Map<Long, Set<PTOperator>> map = this.shutdownOperators;
                                Long valueOf = Long.valueOf(j);
                                HashSet hashSet = new HashSet();
                                set = hashSet;
                                map.put(valueOf, hashSet);
                            }
                            set.add(pTOperator);
                        }
                        pTOperator.setState(PTOperator.State.INACTIVE);
                        streamingContainerAgent.undeployOpers.add(Integer.valueOf(pTOperator.getId()));
                        this.slowestUpstreamOp.remove(pTOperator);
                        recordEventAsync(new StramEvent.StopOperatorEvent(pTOperator.getName(), pTOperator.getId(), pTOperator.getContainer().getExternalId()));
                        return;
                    case MethodSignatureVisitor.VISIT_RETURN /* 2 */:
                        processOperatorFailure(pTOperator);
                        streamingContainerAgent.undeployOpers.add(Integer.valueOf(pTOperator.getId()));
                        this.slowestUpstreamOp.remove(pTOperator);
                        recordEventAsync(new StramEvent.StopOperatorEvent(pTOperator.getName(), pTOperator.getId(), pTOperator.getContainer().getExternalId()));
                        return;
                    case MethodSignatureVisitor.VISIT_EXCEPTION /* 3 */:
                    default:
                        return;
                }
            case MethodSignatureVisitor.VISIT_RETURN /* 2 */:
                if (deployState != null) {
                    streamingContainerAgent.undeployOpers.add(Integer.valueOf(pTOperator.getId()));
                    this.slowestUpstreamOp.remove(pTOperator);
                    return;
                } else {
                    recordEventAsync(new StramEvent.StopOperatorEvent(pTOperator.getName(), pTOperator.getId(), pTOperator.getContainer().getExternalId()));
                    pTOperator.setState(PTOperator.State.PENDING_DEPLOY);
                    streamingContainerAgent.deployOpers.add(pTOperator);
                    return;
                }
            case MethodSignatureVisitor.VISIT_EXCEPTION /* 3 */:
                if (deployState == null) {
                    streamingContainerAgent.deployOpers.add(pTOperator);
                    return;
                }
                PTContainer container = pTOperator.getContainer();
                LOG.debug("{} marking deployed: {} remote status {}", new Object[]{container.getExternalId(), pTOperator, deployState});
                pTOperator.setState(PTOperator.State.ACTIVE);
                pTOperator.stats.lastHeartbeat = null;
                pTOperator.stats.lastWindowIdChangeTms = this.clock.getTime();
                recordEventAsync(new StramEvent.StartOperatorEvent(pTOperator.getName(), pTOperator.getId(), container.getExternalId()));
                return;
            default:
                if (deployState != null) {
                    streamingContainerAgent.undeployOpers.add(Integer.valueOf(pTOperator.getId()));
                    this.slowestUpstreamOp.remove(pTOperator);
                    recordEventAsync(new StramEvent.StopOperatorEvent(pTOperator.getName(), pTOperator.getId(), pTOperator.getContainer().getExternalId()));
                    return;
                }
                return;
        }
    }

    private void processOperatorFailure(PTOperator pTOperator) {
        if (pTOperator.getState() != PTOperator.State.ACTIVE) {
            LOG.warn("Failed operator {} {} {} to be undeployed by container", pTOperator, pTOperator.getState());
            return;
        }
        pTOperator.setState(PTOperator.State.INACTIVE);
        pTOperator.failureCount++;
        pTOperator.getOperatorMeta().getStatus().failureCount++;
        LOG.warn("Operator failure: {} count: {}", pTOperator, Integer.valueOf(pTOperator.failureCount));
        Integer num = (Integer) pTOperator.getOperatorMeta().getValue(Context.OperatorContext.RECOVERY_ATTEMPTS);
        if (num == null || pTOperator.failureCount <= num.intValue()) {
            LOG.error("Initiating container restart after operator failure {}", pTOperator);
            this.containerStopRequests.put(pTOperator.getContainer().getExternalId(), pTOperator.getContainer().getExternalId());
        } else {
            String format = String.format("Shutdown after reaching failure threshold for %s", pTOperator);
            LOG.warn(format);
            shutdownAllContainers(StreamingContainerUmbilicalProtocol.ShutdownType.ABORT, format);
            this.forcedShutdown = true;
        }
    }

    public StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat containerHeartbeat) {
        long time = this.clock.getTime();
        final StreamingContainerAgent streamingContainerAgent = this.containers.get(containerHeartbeat.getContainerId());
        if (streamingContainerAgent == null || streamingContainerAgent.container.getState() == PTContainer.State.KILLED) {
            LOG.error("Unknown container {}", containerHeartbeat.getContainerId());
            StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
            containerHeartbeatResponse.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT;
            return containerHeartbeatResponse;
        }
        if (streamingContainerAgent.container.getState() == PTContainer.State.ALLOCATED) {
            if (streamingContainerAgent.container.bufferServerAddress == null && containerHeartbeat.bufferServerHost != null) {
                streamingContainerAgent.container.bufferServerAddress = InetSocketAddress.createUnresolved(containerHeartbeat.bufferServerHost, containerHeartbeat.bufferServerPort);
                LOG.info("Container {} buffer server: {}", streamingContainerAgent.container.getExternalId(), streamingContainerAgent.container.bufferServerAddress);
            }
            final long currentTimeMillis = System.currentTimeMillis();
            streamingContainerAgent.container.setState(PTContainer.State.ACTIVE);
            streamingContainerAgent.container.setStartedTime(currentTimeMillis);
            streamingContainerAgent.container.setFinishedTime(-1L);
            streamingContainerAgent.jvmName = containerHeartbeat.jvmName;
            this.poolExecutor.submit(new Runnable() { // from class: com.datatorrent.stram.StreamingContainerManager.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        StreamingContainerManager.this.containerFile.append(streamingContainerAgent.getContainerInfo());
                    } catch (IOException e) {
                        StreamingContainerManager.LOG.warn("Cannot write to container file");
                    }
                    for (PTOperator pTOperator : streamingContainerAgent.container.getOperators()) {
                        try {
                            JSONObject jSONObject = new JSONObject();
                            jSONObject.put("name", pTOperator.getName());
                            jSONObject.put("id", pTOperator.getId());
                            jSONObject.put("container", streamingContainerAgent.container.getExternalId());
                            jSONObject.put("startTime", currentTimeMillis);
                            StreamingContainerManager.this.operatorFile.append(jSONObject);
                        } catch (IOException | JSONException e2) {
                            StreamingContainerManager.LOG.warn("Cannot write to operator file: ", e2);
                        }
                    }
                }
            });
        }
        streamingContainerAgent.containerStackTrace = containerHeartbeat.stackTrace;
        if (containerHeartbeat.restartRequested) {
            LOG.error("Container {} restart request", streamingContainerAgent.container.getExternalId());
            this.containerStopRequests.put(streamingContainerAgent.container.getExternalId(), streamingContainerAgent.container.getExternalId());
        }
        streamingContainerAgent.memoryMBFree = containerHeartbeat.memoryMBFree;
        streamingContainerAgent.gcCollectionCount = containerHeartbeat.gcCollectionCount;
        streamingContainerAgent.gcCollectionTime = containerHeartbeat.gcCollectionTime;
        streamingContainerAgent.undeployOpers.clear();
        streamingContainerAgent.deployOpers.clear();
        if (!this.deployChangeInProgress.get()) {
            streamingContainerAgent.deployCnt = this.deployChangeCnt;
        }
        HashSet newHashSetWithExpectedSize = Sets.newHashSetWithExpectedSize(streamingContainerAgent.container.getOperators().size());
        Iterator<StreamingContainerUmbilicalProtocol.OperatorHeartbeat> it = containerHeartbeat.getContainerStats().operators.iterator();
        while (it.hasNext()) {
            StreamingContainerUmbilicalProtocol.OperatorHeartbeat next = it.next();
            long j = 0;
            newHashSetWithExpectedSize.add(Integer.valueOf(next.nodeId));
            PTOperator pTOperator = this.plan.getAllOperators().get(Integer.valueOf(next.getNodeId()));
            if (pTOperator == null) {
                LOG.info("Heartbeat for unknown operator {} (container {})", Integer.valueOf(next.getNodeId()), containerHeartbeat.getContainerId());
                streamingContainerAgent.undeployOpers.add(Integer.valueOf(next.nodeId));
            } else {
                if (next.requestResponse != null) {
                    Iterator<StatsListener.OperatorResponse> it2 = next.requestResponse.iterator();
                    while (it2.hasNext()) {
                        StatsListener.OperatorResponse next2 = it2.next();
                        if (next2 instanceof OperatorResponse) {
                            this.commandResponse.put((Long) next2.getResponseId(), next2.getResponse());
                            LOG.debug(" Got back the response {} for the request {}", next2, next2.getResponseId());
                        } else {
                            pTOperator.stats.responses.add(next2);
                        }
                    }
                }
                if (pTOperator.getState() != PTOperator.State.ACTIVE || next.getState() != StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE) {
                    processOperatorDeployStatus(pTOperator, next, streamingContainerAgent);
                }
                pTOperator.stats.lastHeartbeat = next;
                ArrayList<Stats.OperatorStats> operatorStatsContainer = next.getOperatorStatsContainer();
                if (!operatorStatsContainer.isEmpty()) {
                    long j2 = 0;
                    long j3 = 0;
                    long j4 = 0;
                    int i = 0;
                    long j5 = -1;
                    pTOperator.stats.recordingId = null;
                    OperatorStatus operatorStatus = pTOperator.stats;
                    operatorStatus.statsRevs.checkout();
                    Iterator<Map.Entry<String, OperatorStatus.PortStatus>> it3 = operatorStatus.inputPortStatusList.entrySet().iterator();
                    while (it3.hasNext()) {
                        it3.next().getValue().recordingId = null;
                    }
                    Iterator<Map.Entry<String, OperatorStatus.PortStatus>> it4 = operatorStatus.outputPortStatusList.entrySet().iterator();
                    while (it4.hasNext()) {
                        it4.next().getValue().recordingId = null;
                    }
                    for (Stats.OperatorStats operatorStats : operatorStatsContainer) {
                        if (operatorStats == null) {
                            LOG.warn("Operator {} statistics list contains null element", Integer.valueOf(next.getNodeId()));
                        } else {
                            if ((operatorStats.checkpoint instanceof Checkpoint) && (pTOperator.getRecentCheckpoint() == null || pTOperator.getRecentCheckpoint().windowId < operatorStats.checkpoint.getWindowId())) {
                                addCheckpoint(pTOperator, (Checkpoint) operatorStats.checkpoint);
                                if (operatorStats.checkpointStats != null) {
                                    operatorStatus.checkpointStats = operatorStats.checkpointStats;
                                    operatorStatus.checkpointTimeMA.add(operatorStats.checkpointStats.checkpointTime);
                                }
                                pTOperator.failureCount = 0;
                            }
                            pTOperator.stats.recordingId = operatorStats.recordingId;
                            EndWindowStats endWindowStats = new EndWindowStats();
                            ArrayList<Stats.OperatorStats.PortStats> arrayList = operatorStats.inputPorts;
                            if (arrayList != null) {
                                HashSet newHashSetWithExpectedSize2 = Sets.newHashSetWithExpectedSize(arrayList.size());
                                for (Stats.OperatorStats.PortStats portStats : arrayList) {
                                    newHashSetWithExpectedSize2.add(portStats.id);
                                    OperatorStatus.PortStatus portStatus = operatorStatus.inputPortStatusList.get(portStats.id);
                                    if (portStatus == null) {
                                        operatorStatus.getClass();
                                        portStatus = new OperatorStatus.PortStatus();
                                        portStatus.portName = portStats.id;
                                        operatorStatus.inputPortStatusList.put(portStats.id, portStatus);
                                    }
                                    portStatus.totalTuples += portStats.tupleCount;
                                    portStatus.recordingId = portStats.recordingId;
                                    j2 += portStats.tupleCount;
                                    endWindowStats.dequeueTimestamps.put(portStats.id, Long.valueOf(portStats.endWindowTimestamp));
                                    Pair<Integer, String> pair = new Pair<>(Integer.valueOf(pTOperator.getId()), portStats.id);
                                    Long l = this.operatorPortLastEndWindowTimestamps.get(pair);
                                    if (l == null) {
                                        l = Long.valueOf(this.lastStatsTimestamp);
                                    }
                                    long max = Math.max(portStats.endWindowTimestamp - l.longValue(), 0L);
                                    portStatus.tuplesPMSMA.add(portStats.tupleCount, max);
                                    portStatus.bufferServerBytesPMSMA.add(portStats.bufferServerBytes, max);
                                    portStatus.queueSizeMA.add(portStats.queueSize);
                                    this.operatorPortLastEndWindowTimestamps.put(pair, Long.valueOf(portStats.endWindowTimestamp));
                                    if (j < portStats.endWindowTimestamp) {
                                        j = portStats.endWindowTimestamp;
                                    }
                                    if (portStats.endWindowTimestamp > j5) {
                                        j5 = portStats.endWindowTimestamp;
                                    }
                                }
                                Iterator<Map.Entry<String, OperatorStatus.PortStatus>> it5 = operatorStatus.inputPortStatusList.entrySet().iterator();
                                while (it5.hasNext()) {
                                    if (!newHashSetWithExpectedSize2.contains(it5.next().getKey())) {
                                        it5.remove();
                                    }
                                }
                            }
                            ArrayList<Stats.OperatorStats.PortStats> arrayList2 = operatorStats.outputPorts;
                            if (arrayList2 != null) {
                                HashSet newHashSetWithExpectedSize3 = Sets.newHashSetWithExpectedSize(arrayList2.size());
                                for (Stats.OperatorStats.PortStats portStats2 : arrayList2) {
                                    newHashSetWithExpectedSize3.add(portStats2.id);
                                    OperatorStatus.PortStatus portStatus2 = operatorStatus.outputPortStatusList.get(portStats2.id);
                                    if (portStatus2 == null) {
                                        operatorStatus.getClass();
                                        portStatus2 = new OperatorStatus.PortStatus();
                                        portStatus2.portName = portStats2.id;
                                        operatorStatus.outputPortStatusList.put(portStats2.id, portStatus2);
                                    }
                                    portStatus2.totalTuples += portStats2.tupleCount;
                                    portStatus2.recordingId = portStats2.recordingId;
                                    j3 += portStats2.tupleCount;
                                    Pair<Integer, String> pair2 = new Pair<>(Integer.valueOf(pTOperator.getId()), portStats2.id);
                                    Long l2 = this.operatorPortLastEndWindowTimestamps.get(pair2);
                                    if (l2 == null) {
                                        l2 = Long.valueOf(this.lastStatsTimestamp);
                                    }
                                    long max2 = Math.max(portStats2.endWindowTimestamp - l2.longValue(), 0L);
                                    portStatus2.tuplesPMSMA.add(portStats2.tupleCount, max2);
                                    portStatus2.bufferServerBytesPMSMA.add(portStats2.bufferServerBytes, max2);
                                    this.operatorPortLastEndWindowTimestamps.put(pair2, Long.valueOf(portStats2.endWindowTimestamp));
                                    if (j < portStats2.endWindowTimestamp) {
                                        j = portStats2.endWindowTimestamp;
                                    }
                                }
                                if (arrayList2.size() > 0) {
                                    endWindowStats.emitTimestamp = ((Stats.OperatorStats.PortStats) arrayList2.iterator().next()).endWindowTimestamp;
                                }
                                Iterator<Map.Entry<String, OperatorStatus.PortStatus>> it6 = operatorStatus.outputPortStatusList.entrySet().iterator();
                                while (it6.hasNext()) {
                                    if (!newHashSetWithExpectedSize3.contains(it6.next().getKey())) {
                                        it6.remove();
                                    }
                                }
                            }
                            if (endWindowStats.emitTimestamp < 0) {
                                endWindowStats.emitTimestamp = j5;
                            }
                            if (operatorStatus.currentWindowId.get() != operatorStats.windowId) {
                                operatorStatus.lastWindowIdChangeTms = time;
                                operatorStatus.currentWindowId.set(operatorStats.windowId);
                            }
                            j4 += operatorStats.cpuTimeUsed;
                            i++;
                            if (pTOperator.getOperatorMeta().getValue(Context.OperatorContext.COUNTERS_AGGREGATOR) != null) {
                                endWindowStats.counters = operatorStats.counters;
                            }
                            if (pTOperator.getOperatorMeta().getMetricAggregatorMeta() != null && pTOperator.getOperatorMeta().getMetricAggregatorMeta().getAggregator() != null) {
                                endWindowStats.metrics = operatorStats.metrics;
                            }
                            if (operatorStats.windowId > this.currentEndWindowStatsWindowId) {
                                Map<Integer, EndWindowStats> map = this.endWindowStatsOperatorMap.get(Long.valueOf(operatorStats.windowId));
                                if (map == null) {
                                    map = new ConcurrentSkipListMap();
                                    Map<Integer, EndWindowStats> putIfAbsent = this.endWindowStatsOperatorMap.putIfAbsent(Long.valueOf(operatorStats.windowId), map);
                                    if (putIfAbsent != null) {
                                        map = putIfAbsent;
                                    }
                                }
                                map.put(Integer.valueOf(next.getNodeId()), endWindowStats);
                                Set<Integer> keySet = this.plan.getAllOperators().keySet();
                                int size = this.plan.getAllOperators().size();
                                if (keySet.containsAll(map.keySet()) && map.size() == size) {
                                    this.completeEndWindowStatsWindowId = operatorStats.windowId;
                                }
                            }
                        }
                    }
                    operatorStatus.totalTuplesProcessed.add(j2);
                    operatorStatus.totalTuplesEmitted.add(j3);
                    LogicalOperatorStatus status = pTOperator.getOperatorMeta().getStatus();
                    if (!pTOperator.isUnifier()) {
                        status.totalTuplesProcessed += j2;
                        status.totalTuplesEmitted += j3;
                    }
                    long longValue = this.operatorLastEndWindowTimestamps.containsKey(Integer.valueOf(pTOperator.getId())) ? this.operatorLastEndWindowTimestamps.get(Integer.valueOf(pTOperator.getId())).longValue() : this.lastStatsTimestamp;
                    if (j >= longValue) {
                        double d = 0.0d;
                        double d2 = 0.0d;
                        if (i != 0) {
                            operatorStatus.cpuNanosPMSMA.add(j4, j - longValue);
                        }
                        Iterator<OperatorStatus.PortStatus> it7 = operatorStatus.inputPortStatusList.values().iterator();
                        while (it7.hasNext()) {
                            d += it7.next().tuplesPMSMA.getAvg();
                        }
                        Iterator<OperatorStatus.PortStatus> it8 = operatorStatus.outputPortStatusList.values().iterator();
                        while (it8.hasNext()) {
                            d2 += it8.next().tuplesPMSMA.getAvg();
                        }
                        operatorStatus.tuplesProcessedPSMA.set(Math.round(d * 1000.0d));
                        operatorStatus.tuplesEmittedPSMA.set(Math.round(d2 * 1000.0d));
                    }
                    this.operatorLastEndWindowTimestamps.put(Integer.valueOf(pTOperator.getId()), Long.valueOf(j));
                    operatorStatus.listenerStats.add(operatorStatsContainer);
                    this.reportStats.put(pTOperator, pTOperator);
                    operatorStatus.statsRevs.commit();
                }
                if (this.lastStatsTimestamp < j) {
                    this.lastStatsTimestamp = j;
                }
            }
        }
        streamingContainerAgent.lastHeartbeatMillis = time;
        for (PTOperator pTOperator2 : streamingContainerAgent.container.getOperators()) {
            if (!newHashSetWithExpectedSize.contains(Integer.valueOf(pTOperator2.getId()))) {
                processOperatorDeployStatus(pTOperator2, null, streamingContainerAgent);
            }
        }
        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse heartbeatResponse = getHeartbeatResponse(streamingContainerAgent);
        if (containerHeartbeat.getContainerStats().operators.isEmpty() && isApplicationIdle()) {
            LOG.info("requesting idle shutdown for container {}", containerHeartbeat.getContainerId());
            heartbeatResponse.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT;
        } else if (streamingContainerAgent.isShutdownRequested()) {
            LOG.info("requesting shutdown for container {}", containerHeartbeat.getContainerId());
            heartbeatResponse.shutdown = streamingContainerAgent.shutdownRequest;
        }
        List<StreamingContainerUmbilicalProtocol.StramToNodeRequest> arrayList3 = heartbeatResponse.nodeRequests != null ? heartbeatResponse.nodeRequests : new ArrayList<>();
        ConcurrentLinkedQueue<StreamingContainerUmbilicalProtocol.StramToNodeRequest> operatorRequests = streamingContainerAgent.getOperatorRequests();
        while (true) {
            StreamingContainerUmbilicalProtocol.StramToNodeRequest poll = operatorRequests.poll();
            if (poll == null) {
                heartbeatResponse.nodeRequests = arrayList3;
                heartbeatResponse.committedWindowId = this.committedWindowId;
                heartbeatResponse.stackTraceRequired = streamingContainerAgent.stackTraceRequested;
                streamingContainerAgent.stackTraceRequested = false;
                this.apexPluginDispatcher.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(containerHeartbeat));
                return heartbeatResponse;
            }
            arrayList3.add(poll);
        }
    }

    public long updateOperatorLatency(PTOperator pTOperator, UpdateOperatorLatencyContext updateOperatorLatencyContext) {
        if (pTOperator.getInputs().isEmpty() || pTOperator.stats.currentWindowId.get() <= 0) {
            return -1L;
        }
        OperatorStatus operatorStatus = pTOperator.stats;
        long j = Long.MAX_VALUE;
        PTOperator pTOperator2 = null;
        int intValue = ((Integer) this.plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)).intValue();
        int intValue2 = ((Integer) this.plan.getLogicalPlan().getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS)).intValue();
        long j2 = operatorStatus.currentWindowId.get();
        if (updateOperatorLatencyContext.endWindowStatsExists(j2)) {
            long endWindowEmitTimestamp = updateOperatorLatencyContext.getEndWindowEmitTimestamp(j2, pTOperator) + updateOperatorLatencyContext.getRPCLatency(pTOperator);
            Iterator<PTOperator.PTInput> it = pTOperator.getInputs().iterator();
            while (it.hasNext()) {
                PTOperator pTOperator3 = it.next().source.source;
                if (!(pTOperator3.getOperatorMeta().getOperator() instanceof Operator.DelayOperator)) {
                    long endWindowEmitTimestamp2 = updateOperatorLatencyContext.getEndWindowEmitTimestamp(j2, pTOperator3);
                    if (endWindowEmitTimestamp2 >= 0) {
                        long rPCLatency = endWindowEmitTimestamp - (endWindowEmitTimestamp2 + updateOperatorLatencyContext.getRPCLatency(pTOperator3));
                        if (rPCLatency < 0) {
                            rPCLatency = 0;
                        }
                        long compareWindowId = WindowGenerator.compareWindowId(pTOperator3.stats.currentWindowId.get(), pTOperator.stats.currentWindowId.get(), intValue) * intValue;
                        if (compareWindowId > rPCLatency && compareWindowId > intValue2) {
                            rPCLatency = compareWindowId;
                        }
                        if (j > rPCLatency) {
                            j = rPCLatency;
                            pTOperator2 = pTOperator3;
                        }
                    }
                }
            }
        } else {
            Iterator<PTOperator.PTInput> it2 = pTOperator.getInputs().iterator();
            while (it2.hasNext()) {
                PTOperator pTOperator4 = it2.next().source.source;
                if (!(pTOperator4.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) && pTOperator4.stats.currentWindowId.get() >= pTOperator.stats.currentWindowId.get()) {
                    long compareWindowId2 = WindowGenerator.compareWindowId(pTOperator4.stats.currentWindowId.get(), pTOperator.stats.currentWindowId.get(), intValue) * intValue;
                    if (j > compareWindowId2) {
                        j = compareWindowId2;
                        pTOperator2 = pTOperator4;
                    }
                }
            }
        }
        if (pTOperator2 == null) {
            return -1L;
        }
        operatorStatus.latencyMA.add(j);
        this.slowestUpstreamOp.put(pTOperator, pTOperator2);
        return j;
    }

    private StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse getHeartbeatResponse(StreamingContainerAgent streamingContainerAgent) {
        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
        if (this.deployChangeInProgress.get() || streamingContainerAgent.deployCnt != this.deployChangeCnt) {
            LOG.debug("{} deferred requests due to concurrent plan change.", streamingContainerAgent.container.toIdStateString());
            containerHeartbeatResponse.hasPendingRequests = true;
            return containerHeartbeatResponse;
        }
        if (!streamingContainerAgent.undeployOpers.isEmpty()) {
            containerHeartbeatResponse.undeployRequest = Lists.newArrayList(streamingContainerAgent.undeployOpers);
            containerHeartbeatResponse.hasPendingRequests = !streamingContainerAgent.deployOpers.isEmpty();
            return containerHeartbeatResponse;
        }
        Set<PTOperator> set = streamingContainerAgent.deployOpers;
        if (set.isEmpty()) {
            return containerHeartbeatResponse;
        }
        for (PTContainer pTContainer : getPhysicalPlan().getContainers()) {
            if (pTContainer.getState() != PTContainer.State.ACTIVE) {
                LOG.debug("{} waiting for container activation {}", streamingContainerAgent.container.toIdStateString(), pTContainer.toIdStateString());
                containerHeartbeatResponse.hasPendingRequests = true;
                return containerHeartbeatResponse;
            }
            for (PTOperator pTOperator : pTContainer.getOperators()) {
                if (pTOperator.getState() == PTOperator.State.PENDING_UNDEPLOY) {
                    LOG.debug("{} waiting for undeploy {} {}", new Object[]{streamingContainerAgent.container.toIdStateString(), pTContainer.toIdStateString(), pTOperator});
                    containerHeartbeatResponse.hasPendingRequests = true;
                    return containerHeartbeatResponse;
                }
            }
        }
        LOG.debug("{} deployable operators: {}", streamingContainerAgent.container.toIdStateString(), set);
        List<OperatorDeployInfo> deployInfoList = streamingContainerAgent.getDeployInfoList(set);
        if (deployInfoList != null && !deployInfoList.isEmpty()) {
            containerHeartbeatResponse.deployRequest = deployInfoList;
            containerHeartbeatResponse.nodeRequests = Lists.newArrayList();
            Iterator<PTOperator> it = set.iterator();
            while (it.hasNext()) {
                containerHeartbeatResponse.nodeRequests.addAll(it.next().deployRequests);
            }
        }
        containerHeartbeatResponse.hasPendingRequests = false;
        return containerHeartbeatResponse;
    }

    private boolean isApplicationIdle() {
        if (this.eventQueueProcessing.get()) {
            return false;
        }
        for (StreamingContainerAgent streamingContainerAgent : this.containers.values()) {
            if (streamingContainerAgent.hasPendingWork()) {
                return false;
            }
            Iterator<PTOperator> it = streamingContainerAgent.container.getOperators().iterator();
            while (it.hasNext()) {
                if (!it.next().stats.isIdle()) {
                    return false;
                }
            }
        }
        return true;
    }

    void addCheckpoint(PTOperator pTOperator, Checkpoint checkpoint) {
        synchronized (pTOperator.checkpoints) {
            if (pTOperator.checkpoints.isEmpty()) {
                pTOperator.checkpoints.add(checkpoint);
            } else {
                Checkpoint last = pTOperator.checkpoints.getLast();
                if (last.windowId != checkpoint.windowId) {
                    if (last.windowId > checkpoint.windowId) {
                        LOG.warn("Out of sequence checkpoint {} last {} (operator {})", new Object[]{checkpoint, last, pTOperator});
                        ListIterator<Checkpoint> listIterator = pTOperator.checkpoints.listIterator();
                        while (listIterator.hasNext() && listIterator.next().windowId < checkpoint.windowId) {
                        }
                        if (listIterator.previous().windowId != checkpoint.windowId) {
                            listIterator.add(checkpoint);
                        }
                    } else {
                        pTOperator.checkpoints.add(checkpoint);
                    }
                }
            }
        }
    }

    public void updateRecoveryCheckpoints(PTOperator pTOperator, UpdateCheckpointsContext updateCheckpointsContext, boolean z) {
        if (pTOperator.getRecoveryCheckpoint().windowId < updateCheckpointsContext.committedWindowId.longValue()) {
            updateCheckpointsContext.committedWindowId.setValue(pTOperator.getRecoveryCheckpoint().windowId);
        }
        if (pTOperator.getState() == PTOperator.State.ACTIVE && updateCheckpointsContext.currentTms - pTOperator.stats.lastWindowIdChangeTms > pTOperator.stats.windowProcessingTimeoutMillis && updateCheckpointsContext.committedWindowId.longValue() >= pTOperator.getRecoveryCheckpoint().windowId && !z) {
            LOG.warn("Marking operator {} blocked committed window {}, recovery window {}, current time {}, last window id change time {}, window processing timeout millis {}", new Object[]{pTOperator, Codec.getStringWindowId(updateCheckpointsContext.committedWindowId.longValue()), Codec.getStringWindowId(pTOperator.getRecoveryCheckpoint().windowId), Long.valueOf(updateCheckpointsContext.currentTms), Long.valueOf(pTOperator.stats.lastWindowIdChangeTms), Integer.valueOf(pTOperator.stats.windowProcessingTimeoutMillis)});
            updateCheckpointsContext.blocked.add(pTOperator);
        }
        Checkpoint checkpoint = Checkpoint.INITIAL_CHECKPOINT;
        Set<LogicalPlan.OperatorMeta> set = updateCheckpointsContext.checkpointGroups.get(pTOperator.getOperatorMeta());
        if (set == null) {
            set = Collections.singleton(pTOperator.getOperatorMeta());
        }
        TreeSet treeSet = new TreeSet(new Checkpoint.CheckpointComparator());
        synchronized (pTOperator.checkpoints) {
            treeSet.addAll(pTOperator.checkpoints);
        }
        HashSet<PTOperator> hashSet = new HashSet(set.size());
        boolean z2 = pTOperator.getState() == PTOperator.State.PENDING_DEPLOY;
        if (set.size() > 1) {
            Iterator<LogicalPlan.OperatorMeta> it = set.iterator();
            while (it.hasNext()) {
                Collection<PTOperator> allOperators = this.plan.getAllOperators(it.next());
                allOperators.addAll(getUnifiersInCheckpointGroup(allOperators));
                for (PTOperator pTOperator2 : allOperators) {
                    synchronized (pTOperator2.checkpoints) {
                        treeSet.retainAll(pTOperator2.checkpoints);
                    }
                    updateCheckpointsContext.visited.add(pTOperator2);
                    hashSet.add(pTOperator2);
                    z2 |= pTOperator2.getState() == PTOperator.State.PENDING_DEPLOY;
                }
            }
            if (!treeSet.isEmpty()) {
                checkpoint = (Checkpoint) treeSet.last();
            }
        } else {
            updateCheckpointsContext.visited.add(pTOperator);
            hashSet.add(pTOperator);
            checkpoint = pTOperator.getRecentCheckpoint();
            if (updateCheckpointsContext.recovery && checkpoint.windowId == -1 && pTOperator.isOperatorStateLess()) {
                checkpoint = new Checkpoint(WindowGenerator.getWindowId(updateCheckpointsContext.currentTms, this.vars.windowStartMillis, ((Integer) getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)).intValue()), 0, 0);
            }
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            Iterator<PTOperator.PTOutput> it3 = ((PTOperator) it2.next()).getOutputs().iterator();
            while (it3.hasNext()) {
                Iterator<PTOperator.PTInput> it4 = it3.next().sinks.iterator();
                while (it4.hasNext()) {
                    PTOperator pTOperator3 = it4.next().target;
                    if (!hashSet.contains(pTOperator3)) {
                        if (!updateCheckpointsContext.visited.contains(pTOperator3)) {
                            updateRecoveryCheckpoints(pTOperator3, updateCheckpointsContext, z);
                        }
                        if (pTOperator3.getRecoveryCheckpoint().windowId >= pTOperator.getRecoveryCheckpoint().windowId) {
                            checkpoint = Checkpoint.min(checkpoint, pTOperator3.getRecoveryCheckpoint());
                        }
                        if (updateCheckpointsContext.blocked.contains(pTOperator3) && pTOperator3.stats.getCurrentWindowId() == pTOperator.stats.getCurrentWindowId()) {
                            updateCheckpointsContext.blocked.remove(pTOperator3);
                        }
                    }
                }
            }
        }
        if (!treeSet.contains(checkpoint) && !treeSet.isEmpty()) {
            checkpoint = (Checkpoint) Objects.firstNonNull(treeSet.floor(checkpoint), checkpoint);
        }
        for (PTOperator pTOperator4 : hashSet) {
            if (!z2 || updateCheckpointsContext.recovery) {
                Checkpoint checkpoint2 = Checkpoint.INITIAL_CHECKPOINT;
                LinkedList<Checkpoint> linkedList = pTOperator4.checkpoints;
                synchronized (linkedList) {
                    if (!linkedList.isEmpty() && linkedList.getFirst().windowId <= checkpoint.windowId) {
                        checkpoint2 = linkedList.getFirst();
                        while (linkedList.size() > 1) {
                            Checkpoint checkpoint3 = linkedList.get(1);
                            if (checkpoint3.windowId > checkpoint.windowId) {
                                break;
                            }
                            linkedList.removeFirst();
                            this.purgeCheckpoints.add(new Pair<>(pTOperator4, Long.valueOf(checkpoint2.windowId)));
                            checkpoint2 = checkpoint3;
                        }
                    } else if (updateCheckpointsContext.recovery && linkedList.isEmpty() && pTOperator4.isOperatorStateLess()) {
                        LOG.debug("Adding checkpoint for stateless operator {} {}", pTOperator4, Codec.getStringWindowId(checkpoint.windowId));
                        checkpoint2 = pTOperator4.addCheckpoint(checkpoint.windowId, this.vars.windowStartMillis);
                    }
                }
                pTOperator4.setRecoveryCheckpoint(checkpoint2);
            } else {
                LOG.debug("Skipping checkpoint update {} during {}", pTOperator4, pTOperator4.getState());
            }
        }
    }

    private static Collection<PTOperator> getUnifiersInCheckpointGroup(Collection<PTOperator> collection) {
        HashSet newHashSet = Sets.newHashSet();
        Iterator<PTOperator> it = collection.iterator();
        while (it.hasNext()) {
            Iterator<PTOperator.PTOutput> it2 = it.next().getOutputs().iterator();
            while (it2.hasNext()) {
                Iterator<PTOperator.PTInput> it3 = it2.next().sinks.iterator();
                while (it3.hasNext()) {
                    PTOperator pTOperator = it3.next().target;
                    if (pTOperator.isUnifier()) {
                        newHashSet.add(pTOperator);
                    }
                }
            }
        }
        return newHashSet;
    }

    public long windowIdToMillis(long j) {
        return WindowGenerator.getWindowMillis(j, this.vars.windowStartMillis, ((Integer) this.plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS)).intValue());
    }

    public long getWindowStartMillis() {
        return this.vars.windowStartMillis;
    }

    protected Map<LogicalPlan.OperatorMeta, Set<LogicalPlan.OperatorMeta>> getCheckpointGroups() {
        if (this.checkpointGroups == null) {
            this.checkpointGroups = new HashMap();
            LogicalPlan logicalPlan = this.plan.getLogicalPlan();
            logicalPlan.resetNIndex();
            LogicalPlan.ValidationContext validationContext = new LogicalPlan.ValidationContext();
            Iterator<LogicalPlan.OperatorMeta> it = logicalPlan.getRootOperators().iterator();
            while (it.hasNext()) {
                this.plan.getLogicalPlan().findStronglyConnected(it.next(), validationContext);
            }
            for (Set<LogicalPlan.OperatorMeta> set : validationContext.stronglyConnected) {
                Iterator<LogicalPlan.OperatorMeta> it2 = set.iterator();
                while (it2.hasNext()) {
                    this.checkpointGroups.put(it2.next(), set);
                }
            }
        }
        return this.checkpointGroups;
    }

    private long updateCheckpoints(boolean z) {
        int i = 0;
        UpdateCheckpointsContext updateCheckpointsContext = new UpdateCheckpointsContext(this.clock, z, getCheckpointGroups());
        Iterator<LogicalPlan.OperatorMeta> it = this.plan.getLogicalPlan().getRootOperators().iterator();
        while (it.hasNext()) {
            List<PTOperator> operators = this.plan.getOperators(it.next());
            if (operators != null) {
                Iterator<PTOperator> it2 = operators.iterator();
                while (it2.hasNext()) {
                    i++;
                    updateRecoveryCheckpoints(it2.next(), updateCheckpointsContext, z);
                }
            }
        }
        if (i == 0) {
            return this.committedWindowId;
        }
        purgeCheckpoints();
        for (PTOperator pTOperator : updateCheckpointsContext.blocked) {
            String externalId = pTOperator.getContainer().getExternalId();
            if (externalId != null) {
                LOG.info("Blocked operator {} container {} time {}ms", new Object[]{pTOperator, pTOperator.getContainer().toIdStateString(), Long.valueOf(updateCheckpointsContext.currentTms - pTOperator.stats.lastWindowIdChangeTms)});
                this.containerStopRequests.put(externalId, externalId);
            }
        }
        return updateCheckpointsContext.committedWindowId.longValue();
    }

    private BufferServerController getBufferServerClient(PTOperator pTOperator) {
        BufferServerController bufferServerController = new BufferServerController(pTOperator.getLogicalId());
        bufferServerController.setToken(pTOperator.getContainer().getBufferServerToken());
        InetSocketAddress inetSocketAddress = pTOperator.getContainer().bufferServerAddress;
        StreamingContainer.eventloop.connect(inetSocketAddress.isUnresolved() ? new InetSocketAddress(inetSocketAddress.getHostName(), inetSocketAddress.getPort()) : inetSocketAddress, bufferServerController);
        return bufferServerController;
    }

    private void purgeCheckpoints() {
        for (Pair<PTOperator, Long> pair : this.purgeCheckpoints) {
            final PTOperator pTOperator = (PTOperator) pair.getFirst();
            if (!pTOperator.isOperatorStateLess()) {
                final long longValue = ((Long) pair.getSecond()).longValue();
                this.poolExecutor.submit(new Runnable() { // from class: com.datatorrent.stram.StreamingContainerManager.4
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            ((StorageAgent) pTOperator.getOperatorMeta().getValue(Context.OperatorContext.STORAGE_AGENT)).delete(pTOperator.getId(), longValue);
                        } catch (IOException e) {
                            StreamingContainerManager.LOG.error("Failed to purge checkpoint for operator {} for windowId {}", new Object[]{pTOperator, Long.valueOf(longValue), e});
                        }
                    }
                });
            }
        }
        this.purgeCheckpoints.clear();
    }

    public void shutdownAllContainers(StreamingContainerUmbilicalProtocol.ShutdownType shutdownType, String str) {
        this.shutdownDiagnosticsMessage = str;
        LOG.info("Initiating application shutdown: type {} {}", shutdownType, str);
        Iterator<StreamingContainerAgent> it = this.containers.values().iterator();
        while (it.hasNext()) {
            it.next().requestShutDown(shutdownType);
        }
    }

    private Map<PTContainer, List<PTOperator>> groupByContainer(Collection<PTOperator> collection) {
        HashMap hashMap = new HashMap();
        for (PTOperator pTOperator : collection) {
            List list = (List) hashMap.get(pTOperator.getContainer());
            if (list == null) {
                list = new ArrayList();
                hashMap.put(pTOperator.getContainer(), list);
            }
            list.add(pTOperator);
        }
        return hashMap;
    }

    private void requestContainer(PTContainer pTContainer) {
        StreamingContainerAgent.ContainerStartRequest containerStartRequest = new StreamingContainerAgent.ContainerStartRequest(pTContainer);
        this.containerStartRequests.add(containerStartRequest);
        this.pendingAllocation.add(containerStartRequest.container);
        this.lastResourceRequest = System.currentTimeMillis();
        Iterator<PTOperator> it = pTContainer.getOperators().iterator();
        while (it.hasNext()) {
            it.next().setState(PTOperator.State.INACTIVE);
        }
    }

    public void deployAfterRestart() {
        if (this.startedFromCheckpoint) {
            try {
                this.deployChangeInProgress.set(true);
                for (PTContainer pTContainer : getPhysicalPlan().getContainers()) {
                    pTContainer.setState(PTContainer.State.NEW);
                    requestContainer(pTContainer);
                    Iterator<PTOperator> it = pTContainer.getOperators().iterator();
                    while (it.hasNext()) {
                        it.next().setState(PTOperator.State.PENDING_DEPLOY);
                    }
                }
            } finally {
                this.deployChangeCnt++;
                this.deployChangeInProgress.set(false);
            }
        }
    }

    @Override // com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext
    public void deploy(Set<PTContainer> set, Collection<PTOperator> collection, Set<PTContainer> set2, Collection<PTOperator> collection2) {
        try {
            this.deployChangeInProgress.set(true);
            for (Map.Entry<PTContainer, List<PTOperator>> entry : groupByContainer(collection).entrySet()) {
                PTContainer key = entry.getKey();
                if (!set2.contains(key) && !set.contains(key) && key.getState() != PTContainer.State.KILLED) {
                    LOG.debug("scheduling undeploy {} {}", entry.getKey().getExternalId(), entry.getValue());
                    Iterator<PTOperator> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        it.next().setState(PTOperator.State.PENDING_UNDEPLOY);
                    }
                }
            }
            Iterator<PTContainer> it2 = set2.iterator();
            while (it2.hasNext()) {
                requestContainer(it2.next());
            }
            for (Map.Entry<PTContainer, List<PTOperator>> entry2 : groupByContainer(collection2).entrySet()) {
                if (!set2.contains(entry2.getKey())) {
                    for (PTOperator pTOperator : entry2.getValue()) {
                        for (PTOperator.PTOutput pTOutput : pTOperator.getOutputs()) {
                            if (!pTOutput.isDownStreamInline()) {
                                Iterator<LogicalPlan.InputPortMeta> it3 = pTOutput.logicalStream.getSinks().iterator();
                                while (it3.hasNext()) {
                                    String concat = Integer.toString(pTOperator.getId()).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(pTOutput.portName).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(this.plan.getStreamCodecIdentifier(it3.next().getStreamCodec()).toString());
                                    if (pTOperator.getContainer().getState() == PTContainer.State.ACTIVE && pTOperator.getContainer().bufferServerAddress.getPort() != 0) {
                                        try {
                                            getBufferServerClient(pTOperator).reset(null, concat, 0L);
                                        } catch (Exception e) {
                                            LOG.error("Failed to reset buffer server {} {}", concat, e);
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
                LOG.debug("scheduling deploy {} {}", entry2.getKey().getExternalId(), entry2.getValue());
                for (PTOperator pTOperator2 : entry2.getValue()) {
                    if (pTOperator2.getState() != PTOperator.State.PENDING_UNDEPLOY) {
                        pTOperator2.setState(PTOperator.State.PENDING_DEPLOY);
                    }
                }
            }
            for (PTContainer pTContainer : set) {
                if (pTContainer.getExternalId() != null) {
                    StreamingContainerAgent streamingContainerAgent = this.containers.get(pTContainer.getExternalId());
                    if (streamingContainerAgent != null) {
                        LOG.debug("Container marked for shutdown: {}", pTContainer);
                        streamingContainerAgent.requestShutDown(StreamingContainerUmbilicalProtocol.ShutdownType.ABORT);
                    }
                }
            }
        } finally {
            this.deployChangeCnt++;
            this.deployChangeInProgress.set(false);
        }
    }

    @Override // com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext
    public void recordEventAsync(StramEvent stramEvent) {
        this.apexPluginDispatcher.dispatch(new DAGExecutionEvent.StramExecutionEvent(stramEvent));
        if (this.eventBus != null) {
            this.eventBus.publishAsync(stramEvent);
        }
    }

    @Override // com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext
    public void dispatch(Runnable runnable) {
        this.eventQueue.add(runnable);
    }

    public OperatorInfo getOperatorInfo(int i) {
        PTOperator pTOperator = this.plan.getAllOperators().get(Integer.valueOf(i));
        if (pTOperator == null) {
            return null;
        }
        return fillPhysicalOperatorInfo(pTOperator);
    }

    public List<OperatorInfo> getOperatorInfoList() {
        ArrayList arrayList = new ArrayList();
        Iterator<PTContainer> it = this.plan.getContainers().iterator();
        while (it.hasNext()) {
            Iterator<PTOperator> it2 = it.next().getOperators().iterator();
            while (it2.hasNext()) {
                arrayList.add(fillPhysicalOperatorInfo(it2.next()));
            }
        }
        return arrayList;
    }

    public LogicalOperatorInfo getLogicalOperatorInfo(String str) {
        LogicalPlan.OperatorMeta m92getOperatorMeta = getLogicalPlan().m92getOperatorMeta(str);
        if (m92getOperatorMeta == null) {
            return null;
        }
        return fillLogicalOperatorInfo(m92getOperatorMeta);
    }

    public LogicalPlan.ModuleMeta getModuleMeta(String str) {
        return getModuleMeta(str, getLogicalPlan());
    }

    private LogicalPlan.ModuleMeta getModuleMeta(String str, LogicalPlan logicalPlan) {
        for (LogicalPlan.ModuleMeta moduleMeta : logicalPlan.getAllModules()) {
            if (moduleMeta.getFullName().equals(str)) {
                return moduleMeta;
            }
            LogicalPlan.ModuleMeta moduleMeta2 = getModuleMeta(str, moduleMeta.getDag());
            if (moduleMeta2 != null) {
                return moduleMeta2;
            }
        }
        return null;
    }

    public List<LogicalOperatorInfo> getLogicalOperatorInfoList() {
        ArrayList arrayList = new ArrayList();
        Iterator<LogicalPlan.OperatorMeta> it = getLogicalPlan().getAllOperators().iterator();
        while (it.hasNext()) {
            arrayList.add(fillLogicalOperatorInfo(it.next()));
        }
        return arrayList;
    }

    public OperatorAggregationInfo getOperatorAggregationInfo(String str) {
        LogicalPlan.OperatorMeta m92getOperatorMeta = getLogicalPlan().m92getOperatorMeta(str);
        if (m92getOperatorMeta == null) {
            return null;
        }
        return fillOperatorAggregationInfo(m92getOperatorMeta);
    }

    public static long toWsWindowId(long j) {
        if (j < 0) {
            return 0L;
        }
        return j;
    }

    private OperatorInfo fillPhysicalOperatorInfo(PTOperator pTOperator) {
        OperatorInfo operatorInfo = new OperatorInfo();
        operatorInfo.container = pTOperator.getContainer().getExternalId();
        operatorInfo.host = pTOperator.getContainer().host;
        operatorInfo.id = Integer.toString(pTOperator.getId());
        operatorInfo.name = pTOperator.getName();
        operatorInfo.className = pTOperator.getOperatorMeta().getOperator().getClass().getName();
        operatorInfo.status = pTOperator.getState().toString();
        if (pTOperator.isUnifier()) {
            operatorInfo.unifierClass = pTOperator.getUnifierClass().getName();
        }
        operatorInfo.logicalName = pTOperator.getOperatorMeta().getName();
        OperatorStatus operatorStatus = pTOperator.stats;
        operatorInfo.recordingId = operatorStatus.recordingId;
        operatorInfo.totalTuplesProcessed = operatorStatus.totalTuplesProcessed.get();
        operatorInfo.totalTuplesEmitted = operatorStatus.totalTuplesEmitted.get();
        operatorInfo.tuplesProcessedPSMA = operatorStatus.tuplesProcessedPSMA.get();
        operatorInfo.tuplesEmittedPSMA = operatorStatus.tuplesEmittedPSMA.get();
        operatorInfo.cpuPercentageMA = operatorStatus.cpuNanosPMSMA.getAvg() / 10000.0d;
        operatorInfo.latencyMA = operatorStatus.latencyMA.getAvg();
        operatorInfo.failureCount = pTOperator.failureCount;
        operatorInfo.recoveryWindowId = toWsWindowId(pTOperator.getRecoveryCheckpoint().windowId);
        operatorInfo.currentWindowId = toWsWindowId(operatorStatus.currentWindowId.get());
        if (operatorStatus.lastHeartbeat != null) {
            operatorInfo.lastHeartbeat = operatorStatus.lastHeartbeat.getGeneratedTms();
        }
        if (operatorStatus.checkpointStats != null) {
            operatorInfo.checkpointTime = operatorStatus.checkpointStats.checkpointTime;
            operatorInfo.checkpointStartTime = operatorStatus.checkpointStats.checkpointStartTime;
        }
        operatorInfo.checkpointTimeMA = operatorStatus.checkpointTimeMA.getAvg();
        for (OperatorStatus.PortStatus portStatus : operatorStatus.inputPortStatusList.values()) {
            PortInfo portInfo = new PortInfo();
            portInfo.name = portStatus.portName;
            portInfo.type = Node.INPUT;
            portInfo.totalTuples = portStatus.totalTuples;
            portInfo.tuplesPSMA = Math.round(portStatus.tuplesPMSMA.getAvg() * 1000.0d);
            portInfo.bufferServerBytesPSMA = Math.round(portStatus.bufferServerBytesPMSMA.getAvg() * 1000.0d);
            portInfo.queueSizeMA = portStatus.queueSizeMA.getAvg();
            portInfo.recordingId = portStatus.recordingId;
            operatorInfo.addPort(portInfo);
        }
        for (OperatorStatus.PortStatus portStatus2 : operatorStatus.outputPortStatusList.values()) {
            PortInfo portInfo2 = new PortInfo();
            portInfo2.name = portStatus2.portName;
            portInfo2.type = Node.OUTPUT;
            portInfo2.totalTuples = portStatus2.totalTuples;
            portInfo2.tuplesPSMA = Math.round(portStatus2.tuplesPMSMA.getAvg() * 1000.0d);
            portInfo2.bufferServerBytesPSMA = Math.round(portStatus2.bufferServerBytesPMSMA.getAvg() * 1000.0d);
            portInfo2.recordingId = portStatus2.recordingId;
            operatorInfo.addPort(portInfo2);
        }
        operatorInfo.counters = operatorStatus.getLastWindowedStats().size() > 0 ? operatorStatus.getLastWindowedStats().get(operatorStatus.getLastWindowedStats().size() - 1).counters : null;
        operatorInfo.metrics = operatorStatus.getLastWindowedStats().size() > 0 ? operatorStatus.getLastWindowedStats().get(operatorStatus.getLastWindowedStats().size() - 1).metrics : null;
        return operatorInfo;
    }

    private LogicalOperatorInfo fillLogicalOperatorInfo(LogicalPlan.OperatorMeta operatorMeta) {
        String externalId;
        LogicalOperatorInfo logicalOperatorInfo = new LogicalOperatorInfo();
        logicalOperatorInfo.name = operatorMeta.getName();
        logicalOperatorInfo.className = operatorMeta.getOperator().getClass().getName();
        logicalOperatorInfo.totalTuplesEmitted = operatorMeta.getStatus().totalTuplesEmitted;
        logicalOperatorInfo.totalTuplesProcessed = operatorMeta.getStatus().totalTuplesProcessed;
        logicalOperatorInfo.failureCount = operatorMeta.getStatus().failureCount;
        logicalOperatorInfo.status = new HashMap();
        logicalOperatorInfo.partitions = new TreeSet();
        logicalOperatorInfo.unifiers = new TreeSet();
        logicalOperatorInfo.containerIds = new TreeSet();
        logicalOperatorInfo.hosts = new TreeSet();
        Collection<PTOperator> allOperators = getPhysicalPlan().getAllOperators(operatorMeta);
        NumberAggregate.LongAggregate longAggregate = new NumberAggregate.LongAggregate();
        for (PTOperator pTOperator : allOperators) {
            OperatorStatus operatorStatus = pTOperator.stats;
            if (pTOperator.isUnifier()) {
                logicalOperatorInfo.unifiers.add(Integer.valueOf(pTOperator.getId()));
            } else {
                logicalOperatorInfo.partitions.add(Integer.valueOf(pTOperator.getId()));
                logicalOperatorInfo.tuplesEmittedPSMA += operatorStatus.tuplesEmittedPSMA.get();
                logicalOperatorInfo.tuplesProcessedPSMA += operatorStatus.tuplesProcessedPSMA.get();
                long calculateLatency = calculateLatency(pTOperator);
                if (calculateLatency > logicalOperatorInfo.latencyMA) {
                    logicalOperatorInfo.latencyMA = calculateLatency;
                }
                longAggregate.addNumber(Long.valueOf(operatorStatus.checkpointTimeMA.getAvg()));
            }
            logicalOperatorInfo.cpuPercentageMA += operatorStatus.cpuNanosPMSMA.getAvg() / 10000.0d;
            if (operatorStatus.lastHeartbeat != null && (logicalOperatorInfo.lastHeartbeat == 0 || logicalOperatorInfo.lastHeartbeat > operatorStatus.lastHeartbeat.getGeneratedTms())) {
                logicalOperatorInfo.lastHeartbeat = operatorStatus.lastHeartbeat.getGeneratedTms();
            }
            long wsWindowId = toWsWindowId(operatorStatus.currentWindowId.get());
            if (logicalOperatorInfo.currentWindowId == 0 || logicalOperatorInfo.currentWindowId > wsWindowId) {
                logicalOperatorInfo.currentWindowId = wsWindowId;
            }
            MutableInt mutableInt = logicalOperatorInfo.status.get(pTOperator.getState().toString());
            if (mutableInt == null) {
                mutableInt = new MutableInt();
                logicalOperatorInfo.status.put(pTOperator.getState().toString(), mutableInt);
            }
            mutableInt.increment();
            if (pTOperator.getRecoveryCheckpoint() != null) {
                long wsWindowId2 = toWsWindowId(pTOperator.getRecoveryCheckpoint().windowId);
                if (logicalOperatorInfo.recoveryWindowId == 0 || logicalOperatorInfo.recoveryWindowId > wsWindowId2) {
                    logicalOperatorInfo.recoveryWindowId = wsWindowId2;
                }
            }
            PTContainer container = pTOperator.getContainer();
            if (container != null && (externalId = container.getExternalId()) != null) {
                logicalOperatorInfo.containerIds.add(externalId);
                logicalOperatorInfo.hosts.add(container.host);
            }
        }
        if (allOperators.size() > 0 && longAggregate.getAvg() != null) {
            logicalOperatorInfo.checkpointTimeMA = longAggregate.getAvg().longValue();
            logicalOperatorInfo.counters = this.latestLogicalCounters.get(operatorMeta.getName());
            logicalOperatorInfo.autoMetrics = this.latestLogicalMetrics.get(operatorMeta.getName());
        }
        return logicalOperatorInfo;
    }

    private OperatorAggregationInfo fillOperatorAggregationInfo(LogicalPlan.OperatorMeta operatorMeta) {
        OperatorAggregationInfo operatorAggregationInfo = new OperatorAggregationInfo();
        Collection<PTOperator> allOperators = getPhysicalPlan().getAllOperators(operatorMeta);
        if (allOperators.isEmpty()) {
            return null;
        }
        operatorAggregationInfo.name = operatorMeta.getName();
        for (PTOperator pTOperator : allOperators) {
            if (!pTOperator.isUnifier()) {
                OperatorStatus operatorStatus = pTOperator.stats;
                operatorAggregationInfo.latencyMA.addNumber(Long.valueOf(operatorStatus.latencyMA.getAvg()));
                operatorAggregationInfo.cpuPercentageMA.addNumber(Double.valueOf(operatorStatus.cpuNanosPMSMA.getAvg() / 10000.0d));
                operatorAggregationInfo.tuplesEmittedPSMA.addNumber(Long.valueOf(operatorStatus.tuplesEmittedPSMA.get()));
                operatorAggregationInfo.tuplesProcessedPSMA.addNumber(Long.valueOf(operatorStatus.tuplesProcessedPSMA.get()));
                operatorAggregationInfo.currentWindowId.addNumber(Long.valueOf(operatorStatus.currentWindowId.get()));
                operatorAggregationInfo.recoveryWindowId.addNumber(Long.valueOf(toWsWindowId(pTOperator.getRecoveryCheckpoint().windowId)));
                if (operatorStatus.lastHeartbeat != null) {
                    operatorAggregationInfo.lastHeartbeat.addNumber(Long.valueOf(operatorStatus.lastHeartbeat.getGeneratedTms()));
                }
                operatorAggregationInfo.checkpointTime.addNumber(Long.valueOf(operatorStatus.checkpointTimeMA.getAvg()));
            }
        }
        return operatorAggregationInfo;
    }

    private long calculateLatency(PTOperator pTOperator) {
        long avg = pTOperator.stats.latencyMA.getAvg();
        long j = 0;
        Iterator<PTOperator.PTOutput> it = pTOperator.getOutputs().iterator();
        while (it.hasNext()) {
            for (PTOperator.PTInput pTInput : it.next().sinks) {
                if (pTInput.target.isUnifier()) {
                    long calculateLatency = calculateLatency(pTInput.target);
                    if (j < calculateLatency) {
                        j = calculateLatency;
                    }
                }
            }
        }
        return avg + j;
    }

    public List<StreamInfo> getStreamInfoList() {
        ArrayList arrayList = new ArrayList();
        Iterator<PTContainer> it = this.plan.getContainers().iterator();
        while (it.hasNext()) {
            for (PTOperator pTOperator : it.next().getOperators()) {
                for (PTOperator.PTOutput pTOutput : pTOperator.getOutputs()) {
                    StreamInfo streamInfo = new StreamInfo();
                    streamInfo.logicalName = pTOutput.logicalStream.getName();
                    streamInfo.source.operatorId = String.valueOf(pTOperator.getId());
                    streamInfo.source.portName = pTOutput.portName;
                    streamInfo.locality = pTOutput.logicalStream.getLocality();
                    for (PTOperator.PTInput pTInput : pTOutput.sinks) {
                        StreamInfo.Port port = new StreamInfo.Port();
                        port.operatorId = String.valueOf(pTInput.target.getId());
                        if (pTInput.target.isUnifier()) {
                            port.portName = StreamingContainer.getUnifierInputPortName(pTInput.portName, pTOperator.getId(), pTOutput.portName);
                        } else {
                            port.portName = pTInput.portName;
                        }
                        streamInfo.sinks.add(port);
                    }
                    arrayList.add(streamInfo);
                }
            }
        }
        return arrayList;
    }

    private void updateOnDeployRequests(PTOperator pTOperator, Predicate<StreamingContainerUmbilicalProtocol.StramToNodeRequest> predicate, StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest) {
        ArrayList arrayList = new ArrayList(pTOperator.deployRequests.size());
        for (StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest2 : pTOperator.deployRequests) {
            if (!predicate.apply(stramToNodeRequest2)) {
                arrayList.add(stramToNodeRequest2);
            }
        }
        if (stramToNodeRequest != null) {
            arrayList.add(stramToNodeRequest);
        }
        pTOperator.deployRequests = Collections.unmodifiableList(arrayList);
    }

    private StreamingContainerAgent getContainerAgentFromOperatorId(int i) {
        StreamingContainerAgent streamingContainerAgent;
        PTOperator pTOperator = this.plan.getAllOperators().get(Integer.valueOf(i));
        if (pTOperator == null || (streamingContainerAgent = this.containers.get(pTOperator.getContainer().getExternalId())) == null) {
            throw new NotFoundException("Operator ID " + i + " not found");
        }
        return streamingContainerAgent;
    }

    public void startRecording(String str, int i, String str2, long j) {
        StreamingContainerAgent containerAgentFromOperatorId = getContainerAgentFromOperatorId(i);
        StramToNodeStartRecordingRequest stramToNodeStartRecordingRequest = new StramToNodeStartRecordingRequest();
        stramToNodeStartRecordingRequest.setOperatorId(i);
        if (!StringUtils.isBlank(str2)) {
            stramToNodeStartRecordingRequest.setPortName(str2);
        }
        stramToNodeStartRecordingRequest.setNumWindows(j);
        stramToNodeStartRecordingRequest.setId(str);
        containerAgentFromOperatorId.addOperatorRequest(stramToNodeStartRecordingRequest);
        PTOperator pTOperator = this.plan.getAllOperators().get(Integer.valueOf(i));
        if (pTOperator != null) {
            updateOnDeployRequests(pTOperator, new RecordingRequestFilter(), stramToNodeStartRecordingRequest);
        }
    }

    public void stopRecording(int i, String str) {
        StreamingContainerAgent containerAgentFromOperatorId = getContainerAgentFromOperatorId(i);
        StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest = new StreamingContainerUmbilicalProtocol.StramToNodeRequest();
        stramToNodeRequest.setOperatorId(i);
        if (!StringUtils.isBlank(str)) {
            stramToNodeRequest.setPortName(str);
        }
        stramToNodeRequest.setRequestType(StreamingContainerUmbilicalProtocol.StramToNodeRequest.RequestType.STOP_RECORDING);
        containerAgentFromOperatorId.addOperatorRequest(stramToNodeRequest);
        PTOperator pTOperator = this.plan.getAllOperators().get(Integer.valueOf(i));
        if (pTOperator != null) {
            updateOnDeployRequests(pTOperator, new RecordingRequestFilter(), null);
        }
    }

    public void syncStats() {
        this.statsRecorder.requestSync();
    }

    public void syncEvents() {
        this.eventRecorder.requestSync();
    }

    public void stopContainer(String str) {
        this.containerStopRequests.put(str, str);
    }

    public Journal.Recoverable getSetOperatorProperty(String str, String str2, String str3) {
        return new SetOperatorProperty(str, str2, str3);
    }

    public Journal.Recoverable getSetPhysicalOperatorProperty(int i, String str, String str2) {
        return new SetPhysicalOperatorProperty(i, str, str2);
    }

    public void setOperatorProperty(String str, String str2, String str3) {
        LogicalPlan.OperatorMeta m92getOperatorMeta = this.plan.getLogicalPlan().m92getOperatorMeta(str);
        if (m92getOperatorMeta == null) {
            throw new IllegalArgumentException("Unknown operator " + str);
        }
        writeJournal(new SetOperatorProperty(str, str2, str3));
        setOperatorProperty(m92getOperatorMeta, str2, str3);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setOperatorProperty(LogicalPlan.OperatorMeta operatorMeta, String str, String str2) {
        LogicalPlanConfiguration.setOperatorProperties((DAG.GenericOperator) operatorMeta.getOperator(), (Map<String, String>) Collections.singletonMap(str, str2));
        for (PTOperator pTOperator : this.plan.getOperators(operatorMeta)) {
            StramToNodeSetPropertyRequest stramToNodeSetPropertyRequest = new StramToNodeSetPropertyRequest();
            stramToNodeSetPropertyRequest.setOperatorId(pTOperator.getId());
            stramToNodeSetPropertyRequest.setPropertyKey(str);
            stramToNodeSetPropertyRequest.setPropertyValue(str2);
            addOperatorRequest(pTOperator, stramToNodeSetPropertyRequest);
            updateOnDeployRequests(pTOperator, new SetOperatorPropertyRequestFilter(str), stramToNodeSetPropertyRequest);
        }
        recordEventAsync(new StramEvent.SetOperatorPropertyEvent(operatorMeta.getName(), str, str2));
    }

    public void setPhysicalOperatorProperty(int i, String str, String str2) {
        PTOperator pTOperator = this.plan.getAllOperators().get(Integer.valueOf(i));
        if (pTOperator == null) {
            return;
        }
        writeJournal(new SetPhysicalOperatorProperty(i, str, str2));
        setPhysicalOperatorProperty(pTOperator, str, str2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setPhysicalOperatorProperty(PTOperator pTOperator, String str, String str2) {
        String name = pTOperator.getName();
        StramToNodeSetPropertyRequest stramToNodeSetPropertyRequest = new StramToNodeSetPropertyRequest();
        stramToNodeSetPropertyRequest.setOperatorId(pTOperator.getId());
        stramToNodeSetPropertyRequest.setPropertyKey(str);
        stramToNodeSetPropertyRequest.setPropertyValue(str2);
        addOperatorRequest(pTOperator, stramToNodeSetPropertyRequest);
        updateOnDeployRequests(pTOperator, new SetOperatorPropertyRequestFilter(str), stramToNodeSetPropertyRequest);
        recordEventAsync(new StramEvent.SetPhysicalOperatorPropertyEvent(name, pTOperator.getId(), str, str2));
    }

    @Override // com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext
    public void addOperatorRequest(PTOperator pTOperator, StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest) {
        StreamingContainerAgent containerAgent = getContainerAgent(pTOperator.getContainer().getExternalId());
        if (containerAgent != null) {
            containerAgent.addOperatorRequest(stramToNodeRequest);
        }
    }

    public void setLoggersLevel(Map<String, String> map) {
        LOG.debug("change logger request");
        StramToNodeChangeLoggersRequest stramToNodeChangeLoggersRequest = new StramToNodeChangeLoggersRequest();
        stramToNodeChangeLoggersRequest.setTargetChanges(map);
        Iterator<StreamingContainerAgent> it = this.containers.values().iterator();
        while (it.hasNext()) {
            it.next().addOperatorRequest(stramToNodeChangeLoggersRequest);
        }
    }

    public FutureTask<Object> getPhysicalOperatorProperty(int i, String str, long j) {
        PTOperator pTOperator = this.plan.getAllOperators().get(Integer.valueOf(i));
        StramToNodeGetPropertyRequest stramToNodeGetPropertyRequest = new StramToNodeGetPropertyRequest();
        stramToNodeGetPropertyRequest.setOperatorId(i);
        stramToNodeGetPropertyRequest.setPropertyName(str);
        addOperatorRequest(pTOperator, stramToNodeGetPropertyRequest);
        RequestHandler requestHandler = new RequestHandler();
        requestHandler.requestId = this.nodeToStramRequestIds.incrementAndGet();
        requestHandler.waitTime = j;
        stramToNodeGetPropertyRequest.requestId = requestHandler.requestId;
        FutureTask<Object> futureTask = new FutureTask<>(requestHandler);
        dispatch(futureTask);
        return futureTask;
    }

    public Attribute.AttributeMap getApplicationAttributes() {
        try {
            return getLogicalPlan().getAttributes().clone();
        } catch (CloneNotSupportedException e) {
            throw new RuntimeException("Cannot clone DAG attributes", e);
        }
    }

    public Attribute.AttributeMap getOperatorAttributes(String str) {
        LogicalPlan.OperatorMeta m92getOperatorMeta = this.plan.getLogicalPlan().m92getOperatorMeta(str);
        if (m92getOperatorMeta == null) {
            throw new IllegalArgumentException("Invalid operatorId " + str);
        }
        try {
            return m92getOperatorMeta.getAttributes().clone();
        } catch (CloneNotSupportedException e) {
            throw new RuntimeException("Cannot clone operator attributes", e);
        }
    }

    public Attribute.AttributeMap getPortAttributes(String str, String str2) {
        LogicalPlan.OperatorMeta m92getOperatorMeta = this.plan.getLogicalPlan().m92getOperatorMeta(str);
        if (m92getOperatorMeta == null) {
            throw new IllegalArgumentException("Invalid operatorId " + str);
        }
        Operators.PortMappingDescriptor portMappingDescriptor = new Operators.PortMappingDescriptor();
        Operators.describe(m92getOperatorMeta.getOperator(), portMappingDescriptor);
        Operators.PortContextPair<Operator.InputPort<?>> portContextPair = portMappingDescriptor.inputPorts.get(str2);
        if (portContextPair != null) {
            try {
                return m92getOperatorMeta.getMeta((Operator.InputPort<?>) portContextPair.component).getAttributes().clone();
            } catch (CloneNotSupportedException e) {
                throw new RuntimeException("Cannot clone port attributes", e);
            }
        }
        Operators.PortContextPair<Operator.OutputPort<?>> portContextPair2 = portMappingDescriptor.outputPorts.get(str2);
        if (portContextPair2 == null) {
            throw new IllegalArgumentException("Invalid port name " + str2);
        }
        try {
            return m92getOperatorMeta.getMeta((Operator.OutputPort<?>) portContextPair2.component).getAttributes().clone();
        } catch (CloneNotSupportedException e2) {
            throw new RuntimeException("Cannot clone port attributes", e2);
        }
    }

    public LogicalPlan getLogicalPlan() {
        return this.plan.getLogicalPlan();
    }

    public FutureTask<Object> logicalPlanModification(List<LogicalPlanRequest> list) throws Exception {
        FutureTask<Object> futureTask = new FutureTask<>(new LogicalPlanChangeRunnable(list));
        dispatch(futureTask);
        return futureTask;
    }

    public CriticalPathInfo getCriticalPathInfo() {
        return this.criticalPathInfo;
    }

    private void checkpoint() throws IOException {
        if (this.recoveryHandler != null) {
            LOG.debug("Checkpointing state");
            this.journal.setOutputStream(this.recoveryHandler.rotateLog());
            CheckpointState checkpointState = new CheckpointState();
            checkpointState.finals = this.vars;
            checkpointState.physicalPlan = this.plan;
            this.recoveryHandler.save(checkpointState);
        }
    }

    @Override // com.datatorrent.stram.plan.physical.PhysicalPlan.PlanContext
    public void writeJournal(Journal.Recoverable recoverable) {
        try {
            if (this.journal != null) {
                this.journal.write(recoverable);
            }
        } catch (Exception e) {
            throw new IllegalStateException("Failed to write to journal " + recoverable, e);
        }
    }

    public static StreamingContainerManager getInstance(RecoveryHandler recoveryHandler, LogicalPlan logicalPlan, boolean z) throws IOException {
        StreamingContainerManager streamingContainerManager;
        try {
            CheckpointState checkpointState = (CheckpointState) recoveryHandler.restore();
            if (checkpointState == null) {
                streamingContainerManager = new StreamingContainerManager(logicalPlan, z, new SystemClock());
            } else {
                PhysicalPlan physicalPlan = checkpointState.physicalPlan;
                physicalPlan.getLogicalPlan().setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, logicalPlan.getAttributes().get(LogicalPlan.APPLICATION_ATTEMPT_ID));
                streamingContainerManager = new StreamingContainerManager(checkpointState, z);
                for (Field field : physicalPlan.getClass().getDeclaredFields()) {
                    if (field.getType() == PhysicalPlan.PlanContext.class) {
                        field.setAccessible(true);
                        try {
                            field.set(physicalPlan, streamingContainerManager);
                            field.setAccessible(false);
                        } catch (Exception e) {
                            throw new RuntimeException("Failed to set " + field, e);
                        }
                    }
                }
                DataInputStream log = recoveryHandler.getLog();
                streamingContainerManager.journal.replay(log);
                log.close();
                physicalPlan.syncCheckpoints(streamingContainerManager.vars.windowStartMillis, streamingContainerManager.clock.getTime());
                streamingContainerManager.committedWindowId = streamingContainerManager.updateCheckpoints(true);
                for (PTContainer pTContainer : physicalPlan.getContainers()) {
                    if (pTContainer.getExternalId() != null) {
                        LOG.debug("Restore container agent {} for {}", pTContainer.getExternalId(), pTContainer);
                        streamingContainerManager.containers.put(pTContainer.getExternalId(), new StreamingContainerAgent(pTContainer, streamingContainerManager.newStreamingContainerContext(pTContainer), streamingContainerManager));
                    } else {
                        LOG.debug("Requesting new resource for {}", pTContainer.toIdStateString());
                        streamingContainerManager.requestContainer(pTContainer);
                    }
                }
                streamingContainerManager.startedFromCheckpoint = true;
            }
            streamingContainerManager.recoveryHandler = recoveryHandler;
            streamingContainerManager.checkpoint();
            return streamingContainerManager;
        } catch (IOException e2) {
            throw new IllegalStateException("Failed to read checkpointed state", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StorageAgent updateStorageAgent(StorageAgent storageAgent, String str, String str2, Configuration configuration) {
        if ((storageAgent instanceof AsyncFSStorageAgent) || (storageAgent instanceof FSStorageAgent)) {
            FSStorageAgent updateFSStorageAgent = updateFSStorageAgent(storageAgent, str, str2, configuration);
            if (updateFSStorageAgent != storageAgent) {
                return new CascadeStorageAgent(storageAgent, updateFSStorageAgent);
            }
        } else if (storageAgent instanceof CascadeStorageAgent) {
            CascadeStorageAgent cascadeStorageAgent = (CascadeStorageAgent) storageAgent;
            return new CascadeStorageAgent(cascadeStorageAgent, updateFSStorageAgent(cascadeStorageAgent.getCurrentStorageAgent(), str, str2, configuration));
        }
        return storageAgent;
    }

    private static StorageAgent updateFSStorageAgent(StorageAgent storageAgent, String str, String str2, Configuration configuration) {
        if (storageAgent instanceof AsyncFSStorageAgent) {
            AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) storageAgent;
            if (asyncFSStorageAgent.path.contains(str)) {
                return new AsyncFSStorageAgent(asyncFSStorageAgent.path.replace(str, str2), configuration);
            }
        } else if (storageAgent instanceof FSStorageAgent) {
            FSStorageAgent fSStorageAgent = (FSStorageAgent) storageAgent;
            if (fSStorageAgent.path.contains(str)) {
                return new FSStorageAgent(fSStorageAgent.path.replace(str, str2), configuration);
            }
        }
        return storageAgent;
    }

    @VisibleForTesting
    protected Collection<Pair<Long, Map<String, Object>>> getLogicalMetrics(String str) {
        if (this.logicalMetrics.get(str) != null) {
            return Collections.unmodifiableCollection(this.logicalMetrics.get(str));
        }
        return null;
    }

    @VisibleForTesting
    protected Object getLogicalCounter(String str) {
        return this.latestLogicalCounters.get(str);
    }

    public void setApexPluginDispatcher(ApexPluginDispatcher apexPluginDispatcher) {
        this.apexPluginDispatcher = apexPluginDispatcher;
    }
}
