package org.apache.flink.runtime.webmonitor.handlers;

import akka.dispatch.Mapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultFileRegion;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.router.Routed;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
import org.apache.flink.shaded.com.google.common.base.Ascii;
import org.apache.flink.shaded.com.google.common.net.HttpHeaders;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.class */
public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class);
    private final HashMap<String, BlobKey> lastSubmittedLog;
    private final HashMap<String, BlobKey> lastSubmittedStdout;
    private final ConcurrentHashMap<String, Boolean> lastRequestPending;
    private final Configuration config;
    private Future<BlobCache> cache;
    private boolean serveLogFile;
    private final ExecutionContextExecutor executor;
    private final Time timeTimeout;

    /* renamed from: org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler$7, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler$7.class */
    static /* synthetic */ class AnonymousClass7 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$webmonitor$handlers$TaskManagerLogHandler$FileMode = new int[FileMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$webmonitor$handlers$TaskManagerLogHandler$FileMode[FileMode.LOG.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$webmonitor$handlers$TaskManagerLogHandler$FileMode[FileMode.STDOUT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler$FileMode.class */
    public enum FileMode {
        LOG,
        STDOUT
    }

    public TaskManagerLogHandler(JobManagerRetriever jobManagerRetriever, ExecutionContextExecutor executionContextExecutor, scala.concurrent.Future<String> future, FiniteDuration finiteDuration, FileMode fileMode, Configuration configuration, boolean z) {
        super(jobManagerRetriever, future, finiteDuration, z);
        this.lastSubmittedLog = new HashMap<>();
        this.lastSubmittedStdout = new HashMap<>();
        this.lastRequestPending = new ConcurrentHashMap<>();
        this.executor = (ExecutionContextExecutor) Preconditions.checkNotNull(executionContextExecutor);
        this.config = configuration;
        switch (AnonymousClass7.$SwitchMap$org$apache$flink$runtime$webmonitor$handlers$TaskManagerLogHandler$FileMode[fileMode.ordinal()]) {
            case Ascii.SOH /* 1 */:
                this.serveLogFile = true;
                break;
            case 2:
                this.serveLogFile = false;
                break;
        }
        this.timeTimeout = Time.milliseconds(finiteDuration.toMillis());
    }

    @Override // org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase
    protected void respondAsLeader(final ChannelHandlerContext channelHandlerContext, Routed routed, final ActorGateway actorGateway) {
        if (this.cache == null) {
            this.cache = new FlinkFuture(actorGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), this.timeout).map(new Mapper<Object, BlobCache>() { // from class: org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.1
                /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                public BlobCache m15apply(Object obj) {
                    Option host = actorGateway.actor().path().address().host();
                    return new BlobCache(new InetSocketAddress(host.isDefined() ? (String) host.get() : "localhost", ((Integer) obj).intValue()), TaskManagerLogHandler.this.config);
                }
            }, this.executor));
        }
        final String str = (String) routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
        final HttpRequest request = routed.request();
        if (this.lastRequestPending.putIfAbsent(str, true) != null) {
            display(channelHandlerContext, request, "loading...");
            return;
        }
        try {
            Future thenComposeAsync = new FlinkFuture(actorGateway.ask(new JobManagerMessages.RequestTaskManagerInstance(new InstanceID(StringUtils.hexStringToByte(str))), this.timeout).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.TaskManagerInstance.class))).thenCompose(new ApplyFunction<JobManagerMessages.TaskManagerInstance, Future<BlobKey>>() { // from class: org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.2
                public Future<BlobKey> apply(JobManagerMessages.TaskManagerInstance taskManagerInstance) {
                    Instance instance = (Instance) taskManagerInstance.instance().get();
                    return TaskManagerLogHandler.this.serveLogFile ? instance.getTaskManagerGateway().requestTaskManagerLog(TaskManagerLogHandler.this.timeTimeout) : instance.getTaskManagerGateway().requestTaskManagerStdout(TaskManagerLogHandler.this.timeTimeout);
                }
            }).thenCombine(this.cache, new BiFunction<BlobKey, BlobCache, Tuple2<BlobKey, BlobCache>>() { // from class: org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.4
                public Tuple2<BlobKey, BlobCache> apply(BlobKey blobKey, BlobCache blobCache) {
                    return Tuple2.of(blobKey, blobCache);
                }
            }).thenComposeAsync(new ApplyFunction<Tuple2<BlobKey, BlobCache>, Future<String>>() { // from class: org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.3
                public Future<String> apply(Tuple2<BlobKey, BlobCache> tuple2) {
                    BlobKey blobKey = (BlobKey) tuple2.f0;
                    BlobCache blobCache = (BlobCache) tuple2.f1;
                    HashMap hashMap = TaskManagerLogHandler.this.serveLogFile ? TaskManagerLogHandler.this.lastSubmittedLog : TaskManagerLogHandler.this.lastSubmittedStdout;
                    if (!hashMap.containsKey(str)) {
                        hashMap.put(str, blobKey);
                    } else if (!blobKey.equals(hashMap.get(str))) {
                        try {
                            blobCache.deleteGlobal((BlobKey) hashMap.get(str));
                            hashMap.put(str, blobKey);
                        } catch (IOException e) {
                            return FlinkCompletableFuture.completedExceptionally(new Exception("Could not delete file for " + str + '.', e));
                        }
                    }
                    try {
                        return FlinkCompletableFuture.completed(blobCache.getURL(blobKey).getFile());
                    } catch (IOException e2) {
                        return FlinkCompletableFuture.completedExceptionally(new Exception("Could not retrieve blob for " + blobKey + '.', e2));
                    }
                }
            }, this.executor);
            thenComposeAsync.exceptionally(new ApplyFunction<Throwable, Void>() { // from class: org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.5
                public Void apply(Throwable th) {
                    TaskManagerLogHandler.this.display(channelHandlerContext, request, "Fetching TaskManager log failed.");
                    TaskManagerLogHandler.LOG.error("Fetching TaskManager log failed.", th);
                    TaskManagerLogHandler.this.lastRequestPending.remove(str);
                    return null;
                }
            });
            thenComposeAsync.thenAccept(new AcceptFunction<String>() { // from class: org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.6
                public void accept(String str2) {
                    ChannelFuture addListener;
                    try {
                        final RandomAccessFile randomAccessFile = new RandomAccessFile(new File(str2), "r");
                        try {
                            long length = randomAccessFile.length();
                            final FileChannel channel = randomAccessFile.getChannel();
                            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                            defaultHttpResponse.headers().set(HttpHeaders.CONTENT_TYPE, "text/plain");
                            if (io.netty.handler.codec.http.HttpHeaders.isKeepAlive(request)) {
                                defaultHttpResponse.headers().set(HttpHeaders.CONNECTION, "keep-alive");
                            }
                            io.netty.handler.codec.http.HttpHeaders.setContentLength(defaultHttpResponse, length);
                            channelHandlerContext.write(defaultHttpResponse);
                            GenericFutureListener<io.netty.util.concurrent.Future<? super Void>> genericFutureListener = new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() { // from class: org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler.6.1
                                public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception {
                                    TaskManagerLogHandler.this.lastRequestPending.remove(str);
                                    channel.close();
                                    randomAccessFile.close();
                                }
                            };
                            if (channelHandlerContext.pipeline().get(SslHandler.class) == null) {
                                channelHandlerContext.write(new DefaultFileRegion(channel, 0L, length), channelHandlerContext.newProgressivePromise()).addListener(genericFutureListener);
                                addListener = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
                            } else {
                                try {
                                    addListener = channelHandlerContext.writeAndFlush(new HttpChunkedInput(new ChunkedFile(randomAccessFile, 0L, length, 8192)), channelHandlerContext.newProgressivePromise()).addListener(genericFutureListener);
                                } catch (IOException e) {
                                    TaskManagerLogHandler.this.display(channelHandlerContext, request, "Displaying TaskManager log failed.");
                                    TaskManagerLogHandler.LOG.warn("Could not write http data.", e);
                                    return;
                                }
                            }
                            if (io.netty.handler.codec.http.HttpHeaders.isKeepAlive(request)) {
                                return;
                            }
                            addListener.addListener(ChannelFutureListener.CLOSE);
                        } catch (IOException e2) {
                            TaskManagerLogHandler.this.display(channelHandlerContext, request, "Displaying TaskManager log failed.");
                            TaskManagerLogHandler.LOG.error("Displaying TaskManager log failed.", e2);
                            try {
                                randomAccessFile.close();
                            } catch (IOException e3) {
                                TaskManagerLogHandler.LOG.error("Could not close random access file.", e3);
                            }
                        }
                    } catch (FileNotFoundException e4) {
                        TaskManagerLogHandler.this.display(channelHandlerContext, request, "Displaying TaskManager log failed.");
                        TaskManagerLogHandler.LOG.error("Displaying TaskManager log failed.", e4);
                    }
                }
            });
        } catch (Exception e) {
            display(channelHandlerContext, request, "Error: " + e.getMessage());
            LOG.error("Fetching TaskManager log failed.", e);
            this.lastRequestPending.remove(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void display(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, String str) {
        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        defaultHttpResponse.headers().set(HttpHeaders.CONTENT_TYPE, "text/plain");
        if (io.netty.handler.codec.http.HttpHeaders.isKeepAlive(httpRequest)) {
            defaultHttpResponse.headers().set(HttpHeaders.CONNECTION, "keep-alive");
        }
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(str.getBytes());
        io.netty.handler.codec.http.HttpHeaders.setContentLength(defaultHttpResponse, r0.length);
        channelHandlerContext.write(defaultHttpResponse);
        channelHandlerContext.write(copiedBuffer);
        ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        if (io.netty.handler.codec.http.HttpHeaders.isKeepAlive(httpRequest)) {
            return;
        }
        writeAndFlush.addListener(ChannelFutureListener.CLOSE);
    }
}
