package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import akka.actor.Props;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.StackTraceSampleMessages;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.class */
public class StackTraceSampleCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
    private static final int NUM_GHOST_SAMPLE_IDS = 10;
    private final ActorGateway responseActor;
    private final int sampleTimeout;
    private int sampleIdCounter;
    private Timer timer;
    private boolean isShutDown;
    private final Object lock = new Object();
    private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap();
    private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque<>(10);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator$PendingStackTraceSample.class */
    public static class PendingStackTraceSample {
        private final int sampleId;
        private final Set<ExecutionAttemptID> pendingTasks;
        private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
        private boolean isDiscarded;
        private final long startTime = System.currentTimeMillis();
        private final Promise<StackTraceSample> stackTracePromise = new Promise.DefaultPromise();

        PendingStackTraceSample(int i, ExecutionAttemptID[] executionAttemptIDArr) {
            this.sampleId = i;
            this.pendingTasks = new HashSet(Arrays.asList(executionAttemptIDArr));
            this.stackTracesByTask = Maps.newHashMapWithExpectedSize(executionAttemptIDArr.length);
        }

        int getSampleId() {
            return this.sampleId;
        }

        long getStartTime() {
            return this.startTime;
        }

        boolean isDiscarded() {
            return this.isDiscarded;
        }

        boolean isComplete() {
            if (this.isDiscarded) {
                throw new IllegalStateException("Discarded");
            }
            return this.pendingTasks.isEmpty();
        }

        void discard(Throwable th) {
            if (this.isDiscarded) {
                return;
            }
            this.pendingTasks.clear();
            this.stackTracesByTask.clear();
            this.stackTracePromise.failure(new RuntimeException("Discarded", th));
            this.isDiscarded = true;
        }

        void collectStackTraces(ExecutionAttemptID executionAttemptID, List<StackTraceElement[]> list) {
            if (this.isDiscarded) {
                throw new IllegalStateException("Discarded");
            }
            if (this.pendingTasks.remove(executionAttemptID)) {
                this.stackTracesByTask.put(executionAttemptID, Collections.unmodifiableList(list));
            } else {
                if (!isComplete()) {
                    throw new IllegalArgumentException("Unknown task " + executionAttemptID);
                }
                throw new IllegalStateException("Completed");
            }
        }

        void completePromiseAndDiscard() {
            if (!isComplete()) {
                throw new IllegalStateException("Not completed yet");
            }
            this.isDiscarded = true;
            this.stackTracePromise.success(new StackTraceSample(this.sampleId, this.startTime, System.currentTimeMillis(), this.stackTracesByTask));
        }

        Future<StackTraceSample> getStackTraceSampleFuture() {
            return this.stackTracePromise.future();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator$StackTraceSampleCoordinatorActor.class */
    private static class StackTraceSampleCoordinatorActor extends FlinkUntypedActor {
        StackTraceSampleCoordinator coordinator;

        public StackTraceSampleCoordinatorActor(StackTraceSampleCoordinator stackTraceSampleCoordinator) {
            this.coordinator = (StackTraceSampleCoordinator) Preconditions.checkNotNull(stackTraceSampleCoordinator, "Stack trace sample coordinator");
        }

        protected void handleMessage(Object obj) throws Exception {
            try {
                if (obj instanceof StackTraceSampleMessages.ResponseStackTraceSampleSuccess) {
                    StackTraceSampleMessages.ResponseStackTraceSampleSuccess responseStackTraceSampleSuccess = (StackTraceSampleMessages.ResponseStackTraceSampleSuccess) obj;
                    this.coordinator.collectStackTraces(responseStackTraceSampleSuccess.sampleId(), responseStackTraceSampleSuccess.executionId(), responseStackTraceSampleSuccess.samples());
                } else {
                    if (!(obj instanceof StackTraceSampleMessages.ResponseStackTraceSampleFailure)) {
                        throw new IllegalArgumentException("Unexpected task sample message");
                    }
                    StackTraceSampleMessages.ResponseStackTraceSampleFailure responseStackTraceSampleFailure = (StackTraceSampleMessages.ResponseStackTraceSampleFailure) obj;
                    this.coordinator.cancelStackTraceSample(responseStackTraceSampleFailure.sampleId(), responseStackTraceSampleFailure.cause());
                }
            } catch (Throwable th) {
                this.LOG.error("Error responding to message '" + obj + "': " + th.getMessage() + ".", th);
            }
        }

        protected UUID getLeaderSessionID() {
            return null;
        }
    }

    public StackTraceSampleCoordinator(ActorSystem actorSystem, int i) {
        this.responseActor = new AkkaActorGateway(actorSystem.actorOf(Props.create(StackTraceSampleCoordinatorActor.class, new Object[]{this})), (UUID) null);
        Preconditions.checkArgument(i >= 0);
        this.sampleTimeout = i;
    }

    /* JADX WARN: Finally extract failed */
    public Future<StackTraceSample> triggerStackTraceSample(ExecutionVertex[] executionVertexArr, int i, FiniteDuration finiteDuration, int i2) {
        Preconditions.checkNotNull(executionVertexArr, "Tasks to sample");
        Preconditions.checkArgument(executionVertexArr.length >= 1, "No tasks to sample");
        Preconditions.checkArgument(i >= 1, "No number of samples");
        Preconditions.checkArgument(i2 >= 0, "Negative maximum stack trace depth");
        ExecutionAttemptID[] executionAttemptIDArr = new ExecutionAttemptID[executionVertexArr.length];
        for (int i3 = 0; i3 < executionAttemptIDArr.length; i3++) {
            Execution currentExecutionAttempt = executionVertexArr[i3].getCurrentExecutionAttempt();
            if (currentExecutionAttempt == null || currentExecutionAttempt.getState() != ExecutionState.RUNNING) {
                return new Promise.DefaultPromise().failure(new IllegalStateException("Task " + executionVertexArr[i3].getTaskNameWithSubtaskIndex() + " is not running.")).future();
            }
            executionAttemptIDArr[i3] = currentExecutionAttempt.getAttemptId();
        }
        synchronized (this.lock) {
            if (this.isShutDown) {
                return new Promise.DefaultPromise().failure(new IllegalStateException("Shut down")).future();
            }
            if (this.timer == null) {
                this.timer = new Timer("Stack trace sample coordinator timer");
            }
            int i4 = this.sampleIdCounter;
            this.sampleIdCounter = i4 + 1;
            LOG.debug("Triggering stack trace sample {}", Integer.valueOf(i4));
            final PendingStackTraceSample pendingStackTraceSample = new PendingStackTraceSample(i4, executionAttemptIDArr);
            long millis = (i * finiteDuration.toMillis()) + this.sampleTimeout;
            TimerTask timerTask = new TimerTask() { // from class: org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinator.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        synchronized (StackTraceSampleCoordinator.this.lock) {
                            if (!pendingStackTraceSample.isDiscarded()) {
                                StackTraceSampleCoordinator.LOG.info("Sample {} expired before completing", Integer.valueOf(pendingStackTraceSample.getSampleId()));
                                pendingStackTraceSample.discard(new RuntimeException("Time out"));
                                if (StackTraceSampleCoordinator.this.pendingSamples.remove(Integer.valueOf(pendingStackTraceSample.getSampleId())) != null) {
                                    StackTraceSampleCoordinator.this.rememberRecentSampleId(pendingStackTraceSample.getSampleId());
                                }
                            }
                        }
                    } catch (Throwable th) {
                        StackTraceSampleCoordinator.LOG.error("Exception while handling sample timeout", th);
                    }
                }
            };
            this.pendingSamples.put(Integer.valueOf(i4), pendingStackTraceSample);
            this.timer.schedule(timerTask, millis);
            boolean z = true;
            int i5 = 0;
            while (true) {
                try {
                    if (i5 >= executionVertexArr.length) {
                        break;
                    }
                    if (!executionVertexArr[i5].sendMessageToCurrentExecution(new StackTraceSampleMessages.TriggerStackTraceSample(i4, executionAttemptIDArr[i5], i, finiteDuration, i2), executionAttemptIDArr[i5], this.responseActor)) {
                        z = false;
                        break;
                    }
                    i5++;
                } catch (Throwable th) {
                    if (!z) {
                        pendingStackTraceSample.discard(new RuntimeException("Failed to trigger sample, because task has been reset."));
                        this.pendingSamples.remove(Integer.valueOf(i4));
                        rememberRecentSampleId(i4);
                    }
                    throw th;
                }
            }
            Future<StackTraceSample> stackTraceSampleFuture = pendingStackTraceSample.getStackTraceSampleFuture();
            if (!z) {
                pendingStackTraceSample.discard(new RuntimeException("Failed to trigger sample, because task has been reset."));
                this.pendingSamples.remove(Integer.valueOf(i4));
                rememberRecentSampleId(i4);
            }
            return stackTraceSampleFuture;
        }
    }

    public void cancelStackTraceSample(int i, Exception exc) {
        synchronized (this.lock) {
            if (this.isShutDown) {
                return;
            }
            PendingStackTraceSample remove = this.pendingSamples.remove(Integer.valueOf(i));
            if (remove != null) {
                if (exc != null) {
                    LOG.info("Cancelling sample " + i, exc);
                } else {
                    LOG.info("Cancelling sample {}", Integer.valueOf(i));
                }
                remove.discard(exc);
                rememberRecentSampleId(i);
            }
        }
    }

    public void shutDown() {
        synchronized (this.lock) {
            if (!this.isShutDown) {
                LOG.info("Shutting down stack trace sample coordinator.");
                Iterator<PendingStackTraceSample> it = this.pendingSamples.values().iterator();
                while (it.hasNext()) {
                    it.next().discard(new RuntimeException("Shut down"));
                }
                this.pendingSamples.clear();
                if (this.timer != null) {
                    this.timer.cancel();
                }
                this.isShutDown = true;
            }
        }
    }

    public void collectStackTraces(int i, ExecutionAttemptID executionAttemptID, List<StackTraceElement[]> list) {
        synchronized (this.lock) {
            if (this.isShutDown) {
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Collecting stack trace sample {} of task {}", Integer.valueOf(i), executionAttemptID);
            }
            PendingStackTraceSample pendingStackTraceSample = this.pendingSamples.get(Integer.valueOf(i));
            if (pendingStackTraceSample != null) {
                pendingStackTraceSample.collectStackTraces(executionAttemptID, list);
                if (pendingStackTraceSample.isComplete()) {
                    this.pendingSamples.remove(Integer.valueOf(i));
                    rememberRecentSampleId(i);
                    pendingStackTraceSample.completePromiseAndDiscard();
                }
            } else if (this.recentPendingSamples.contains(Integer.valueOf(i))) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received late stack trace sample {} of task {}", Integer.valueOf(i), executionAttemptID);
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Unknown sample ID " + i);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rememberRecentSampleId(int i) {
        if (this.recentPendingSamples.size() >= 10) {
            this.recentPendingSamples.removeFirst();
        }
        this.recentPendingSamples.addLast(Integer.valueOf(i));
    }

    int getNumberOfPendingSamples() {
        int size;
        synchronized (this.lock) {
            size = this.pendingSamples.size();
        }
        return size;
    }
}
