package org.apache.flink.mesos.util;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedStream;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/flink/mesos/util/MesosArtifactServer.class */
public class MesosArtifactServer implements MesosArtifactResolver {
    private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class);
    private final Router router;
    private ServerBootstrap bootstrap;
    private Channel serverChannel;
    private final URL baseURL;
    private final Map<Path, URL> paths = new HashMap();

    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/flink/mesos/util/MesosArtifactServer$UnknownFileHandler.class */
    public static class UnknownFileHandler extends SimpleChannelInboundHandler<Object> {
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) {
            sendNotFound(channelHandlerContext);
        }

        private static void sendNotFound(ChannelHandlerContext channelHandlerContext) {
            channelHandlerContext.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND)).addListener(ChannelFutureListener.CLOSE);
        }
    }

    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/flink/mesos/util/MesosArtifactServer$VirtualFileServerHandler.class */
    public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<RoutedRequest> {
        private FileSystem fs;
        private Path path;

        public VirtualFileServerHandler(Path path) throws IOException {
            this.path = path;
            if (!path.isAbsolute()) {
                throw new IllegalArgumentException("path must be absolute: " + path.toString());
            }
            this.fs = path.getFileSystem();
            if (!this.fs.exists(path) || this.fs.getFileStatus(path).isDir()) {
                throw new IllegalArgumentException("no such file: " + path.toString());
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest) throws Exception {
            HttpRequest request = routedRequest.getRequest();
            if (MesosArtifactServer.LOG.isDebugEnabled()) {
                MesosArtifactServer.LOG.debug("{} request for file '{}'", request.getMethod(), this.path);
            }
            if (request.getMethod() != HttpMethod.GET && request.getMethod() != HttpMethod.HEAD) {
                sendMethodNotAllowed(channelHandlerContext);
                return;
            }
            try {
                FileStatus fileStatus = this.fs.getFileStatus(this.path);
                DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                HttpHeaders.setHeader(defaultHttpResponse, "Connection", "close");
                HttpHeaders.setHeader(defaultHttpResponse, "Cache-Control", "private");
                HttpHeaders.setHeader(defaultHttpResponse, "Content-Type", "application/octet-stream");
                HttpHeaders.setContentLength(defaultHttpResponse, fileStatus.getLen());
                channelHandlerContext.write(defaultHttpResponse);
                if (request.getMethod() == HttpMethod.GET) {
                    FSDataInputStream open = this.fs.open(this.path);
                    try {
                        channelHandlerContext.write(new ChunkedStream(open));
                    } catch (Exception e) {
                        open.close();
                        throw e;
                    }
                }
                channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
            } catch (IOException e2) {
                MesosArtifactServer.LOG.error("unable to stat file", e2);
                sendError(channelHandlerContext, HttpResponseStatus.GONE);
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            if (channelHandlerContext.channel().isActive()) {
                MesosArtifactServer.LOG.error("Caught exception", th);
                sendError(channelHandlerContext, HttpResponseStatus.INTERNAL_SERVER_ERROR);
            }
        }

        private static void sendMethodNotAllowed(ChannelHandlerContext channelHandlerContext) {
            channelHandlerContext.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED)).addListener(ChannelFutureListener.CLOSE);
        }

        private static void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
            DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, Unpooled.copiedBuffer("Failure: " + httpResponseStatus + "\r\n", CharsetUtil.UTF_8));
            HttpHeaders.setHeader(defaultFullHttpResponse, "Content-Type", "text/plain; charset=UTF-8");
            channelHandlerContext.writeAndFlush(defaultFullHttpResponse).addListener(ChannelFutureListener.CLOSE);
        }
    }

    public MesosArtifactServer(String str, String str2, int i, Configuration configuration) throws Exception {
        SSLEngineFactory sSLEngineFactory;
        if (i < 0 || i > 65535) {
            throw new IllegalArgumentException("File server port is invalid: " + i);
        }
        if (configuration.getBoolean(MesosOptions.ARTIFACT_SERVER_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(configuration)) {
            LOG.info("Enabling ssl for the artifact server");
            try {
                sSLEngineFactory = SSLUtils.createRestServerSSLEngineFactory(configuration);
            } catch (Exception e) {
                throw new IOException("Failed to initialize SSLContext for the artifact server", e);
            }
        } else {
            sSLEngineFactory = null;
        }
        this.router = new Router();
        final SSLEngineFactory sSLEngineFactory2 = sSLEngineFactory;
        ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.mesos.util.MesosArtifactServer.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void initChannel(SocketChannel socketChannel) {
                RouterHandler routerHandler = new RouterHandler(MesosArtifactServer.this.router, new HashMap());
                if (sSLEngineFactory2 != null) {
                    socketChannel.pipeline().addLast("ssl", new SslHandler(sSLEngineFactory2.createSSLEngine()));
                }
                socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(routerHandler.getName(), routerHandler).addLast(new ChannelHandler[]{new UnknownFileHandler()});
            }
        };
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup nioEventLoopGroup2 = new NioEventLoopGroup();
        this.bootstrap = new ServerBootstrap();
        this.bootstrap.group(nioEventLoopGroup, nioEventLoopGroup2).channel(NioServerSocketChannel.class).childHandler(channelInitializer);
        Channel channel = this.bootstrap.bind(str2, i).sync().channel();
        this.serverChannel = channel;
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.localAddress();
        String hostAddress = inetSocketAddress.getAddress().getHostAddress();
        int port = inetSocketAddress.getPort();
        this.baseURL = new URL(sSLEngineFactory != null ? "https" : "http", str2, port, "/" + str + "/");
        LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", new Object[]{this.baseURL, hostAddress, Integer.valueOf(port)});
    }

    public URL baseURL() {
        return this.baseURL;
    }

    public synchronized int getServerPort() {
        Channel channel = this.serverChannel;
        if (channel == null) {
            return -1;
        }
        try {
            return ((InetSocketAddress) channel.localAddress()).getPort();
        } catch (Exception e) {
            LOG.error("Cannot access local server port", e);
            return -1;
        }
    }

    public synchronized URL addFile(File file, String str) throws IOException, MalformedURLException {
        return addPath(new Path(file.toURI()), new Path(str));
    }

    public synchronized URL addPath(Path path, Path path2) throws IOException, MalformedURLException {
        if (this.paths.containsKey(path2)) {
            throw new IllegalArgumentException("duplicate path registered");
        }
        if (path2.isAbsolute()) {
            throw new IllegalArgumentException("not expecting an absolute path");
        }
        URL url = new URL(this.baseURL, path2.toString());
        this.router.addAny(url.getPath(), new VirtualFileServerHandler(path));
        this.paths.put(path2, url);
        return url;
    }

    public synchronized void removePath(Path path) {
        if (this.paths.containsKey(path)) {
            try {
                this.router.removePathPattern(new URL(this.baseURL, path.toString()).getPath());
            } catch (MalformedURLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.flink.mesos.util.MesosArtifactResolver
    public synchronized Option<URL> resolve(Path path) {
        return Option.apply(this.paths.get(path));
    }

    public synchronized void stop() throws Exception {
        if (this.serverChannel != null) {
            this.serverChannel.close().awaitUninterruptibly();
            this.serverChannel = null;
        }
        if (this.bootstrap != null) {
            if (this.bootstrap.group() != null) {
                this.bootstrap.group().shutdownGracefully();
            }
            this.bootstrap = null;
        }
    }
}
