package org.apache.beam.fn.harness.status;

import java.lang.Thread;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnWorkerStatusGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/status/BeamFnStatusClient.class */
public class BeamFnStatusClient implements AutoCloseable {
    private final StreamObserver<BeamFnApi.WorkerStatusResponse> outboundObserver;
    private final ProcessBundleHandler.BundleProcessorCache processBundleCache;
    private final ManagedChannel channel;
    private final CompletableFuture<Object> inboundObserverCompletion = new CompletableFuture<>();
    private final MemoryMonitor memoryMonitor;
    private final Cache<?, ?> cache;
    private static final Object COMPLETED = new Object();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamFnStatusClient.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/status/BeamFnStatusClient$BundleState.class */
    public static class BundleState {
        final String instruction;
        final String trackedThreadName;
        final long timeSinceTransition;

        public String getInstruction() {
            return this.instruction;
        }

        public String getTrackedThreadName() {
            return this.trackedThreadName;
        }

        public long getTimeSinceTransition() {
            return this.timeSinceTransition;
        }

        public BundleState(String str, String str2, long j) {
            this.instruction = str;
            this.trackedThreadName = str2;
            this.timeSinceTransition = j;
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/status/BeamFnStatusClient$InboundObserver.class */
    private class InboundObserver implements StreamObserver<BeamFnApi.WorkerStatusRequest> {
        private InboundObserver() {
        }

        @Override // org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver
        public void onNext(BeamFnApi.WorkerStatusRequest workerStatusRequest) {
            StringJoiner stringJoiner = new StringJoiner(StringUtils.LF);
            stringJoiner.add(BeamFnStatusClient.this.getMemoryUsage());
            stringJoiner.add(StringUtils.LF);
            stringJoiner.add(BeamFnStatusClient.this.getCacheStats());
            stringJoiner.add(StringUtils.LF);
            stringJoiner.add(BeamFnStatusClient.this.getActiveProcessBundleState());
            stringJoiner.add(StringUtils.LF);
            stringJoiner.add(BeamFnStatusClient.this.getThreadDump());
            BeamFnStatusClient.this.outboundObserver.onNext(BeamFnApi.WorkerStatusResponse.newBuilder().setId(workerStatusRequest.getId()).setStatusInfo(stringJoiner.toString()).build());
        }

        @Override // org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver
        public void onError(Throwable th) {
            BeamFnStatusClient.LOG.error("Error getting SDK harness status", th);
            BeamFnStatusClient.this.inboundObserverCompletion.completeExceptionally(th);
        }

        @Override // org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver
        public void onCompleted() {
            BeamFnStatusClient.this.inboundObserverCompletion.complete(BeamFnStatusClient.COMPLETED);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/status/BeamFnStatusClient$Stack.class */
    public static class Stack {
        final StackTraceElement[] elements;
        final Thread.State state;

        Stack(StackTraceElement[] stackTraceElementArr, Thread.State state) {
            this.elements = stackTraceElementArr;
            this.state = state;
        }

        @Pure
        public int hashCode() {
            return Objects.hash(Integer.valueOf(Arrays.deepHashCode(this.elements)), this.state);
        }

        @EnsuresNonNullIf(expression = {"#1"}, result = true)
        @Pure
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Stack)) {
                return false;
            }
            Stack stack = (Stack) obj;
            return this.state == stack.state && Arrays.deepEquals(this.elements, stack.elements);
        }
    }

    public BeamFnStatusClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> function, ProcessBundleHandler.BundleProcessorCache bundleProcessorCache, PipelineOptions pipelineOptions, Cache<?, ?> cache) {
        this.channel = function.apply(apiServiceDescriptor);
        this.outboundObserver = BeamFnWorkerStatusGrpc.newStub(this.channel).workerStatus(new InboundObserver());
        this.processBundleCache = bundleProcessorCache;
        this.memoryMonitor = MemoryMonitor.fromOptions(pipelineOptions);
        this.cache = cache;
        Thread thread = new Thread(this.memoryMonitor);
        thread.setDaemon(true);
        thread.setPriority(1);
        thread.setName("MemoryMonitor");
        thread.start();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            if (this.inboundObserverCompletion.get(1L, TimeUnit.MINUTES) != COMPLETED) {
                LOG.warn("InboundObserver for BeamFnStatusClient completed with exception.");
            }
        } finally {
            this.channel.shutdown();
            if (!this.channel.awaitTermination(10L, TimeUnit.SECONDS)) {
                this.channel.shutdownNow();
            }
        }
    }

    String getThreadDump() {
        StringJoiner stringJoiner = new StringJoiner(StringUtils.LF);
        stringJoiner.add("========== THREAD DUMP ==========");
        HashMap hashMap = new HashMap();
        Thread.getAllStackTraces().forEach((thread, stackTraceElementArr) -> {
            if (thread != Thread.currentThread()) {
                Stack stack = new Stack(stackTraceElementArr, thread.getState());
                hashMap.putIfAbsent(stack, new ArrayList());
                ((List) hashMap.get(stack)).add(thread.toString());
            }
        });
        hashMap.entrySet().stream().sorted(Comparator.comparingInt(entry -> {
            return -((List) entry.getValue()).size();
        })).forEachOrdered(entry2 -> {
            Stack stack = (Stack) entry2.getKey();
            List list = (List) entry2.getValue();
            stringJoiner.add(String.format("---- Threads (%d): %s State: %s Stack: ----", Integer.valueOf(list.size()), list, stack.state));
            Stream map = Arrays.stream(stack.elements).map((v0) -> {
                return v0.toString();
            });
            Objects.requireNonNull(stringJoiner);
            map.forEach((v1) -> {
                r1.add(v1);
            });
            stringJoiner.add(StringUtils.LF);
        });
        return stringJoiner.toString();
    }

    String getMemoryUsage() {
        StringJoiner stringJoiner = new StringJoiner(StringUtils.LF);
        stringJoiner.add("========== MEMORY USAGE ==========");
        stringJoiner.add(this.memoryMonitor.describeMemory());
        return stringJoiner.toString();
    }

    @VisibleForTesting
    String getCacheStats() {
        StringJoiner stringJoiner = new StringJoiner(StringUtils.LF);
        stringJoiner.add("========== CACHE STATS ==========");
        stringJoiner.add(this.cache.describeStats());
        return stringJoiner.toString();
    }

    @VisibleForTesting
    String getActiveProcessBundleState() {
        StringJoiner stringJoiner = new StringJoiner(StringUtils.LF);
        stringJoiner.add("========== ACTIVE PROCESSING BUNDLES ==========");
        if (this.processBundleCache.getActiveBundleProcessors().isEmpty()) {
            stringJoiner.add("No active processing bundles.");
        } else {
            ArrayList arrayList = new ArrayList();
            this.processBundleCache.getActiveBundleProcessors().entrySet().stream().forEach(entry -> {
                ExecutionStateSampler.ExecutionStateTrackerStatus status = ((ProcessBundleHandler.BundleProcessor) entry.getValue()).getStateTracker().getStatus();
                if (status != null) {
                    arrayList.add(new BundleState((String) entry.getKey(), status.getTrackedThread().getName(), DateTimeUtils.currentTimeMillis() - status.getLastTransitionTimeMillis()));
                }
            });
            arrayList.stream().sorted(Comparator.comparing((v0) -> {
                return v0.getTimeSinceTransition();
            }).reversed()).limit(10L).forEachOrdered(bundleState -> {
                stringJoiner.add(String.format("---- Instruction %s ----", bundleState.getInstruction()));
                stringJoiner.add(String.format("Tracked thread: %s", bundleState.getTrackedThreadName()));
                stringJoiner.add(String.format("Time since transition: %.2f seconds%n", Double.valueOf(bundleState.getTimeSinceTransition() / 1000.0d)));
            });
        }
        return stringJoiner.toString();
    }
}
