package org.apache.tajo.pullserver;

import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.pullserver.retriever.FileChunk;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.FileRegion;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpServerCodec;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;

/* loaded from: input_file:org/apache/tajo/pullserver/TajoPullServerService.class */
public class TajoPullServerService extends AbstractService {
    public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
    public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
    public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
    public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4194304;
    private int port;
    private ChannelFactory selector;
    private final ChannelGroup accepted;
    private HttpPipelineFactory pipelineFact;
    private int sslFileBufferSize;
    private ApplicationId appId;
    private FileSystem localFS;
    private boolean manageOsCache;
    private int readaheadLength;
    private ReadaheadPool readaheadPool;
    public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
    private String userName;
    public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = "tajo.pullserver.ssl.file.buffer.size";
    public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 61440;
    private static boolean STANDALONE;
    final ShuffleMetrics metrics;
    Map<String, ProcessingStatus> processingStatusMap;
    private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
    private static final Map<String, String> userRsrc = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/tajo/pullserver/TajoPullServerService$HttpPipelineFactory.class */
    class HttpPipelineFactory implements ChannelPipelineFactory {
        final PullServer PullServer;
        private SSLFactory sslFactory;

        public HttpPipelineFactory(Configuration configuration) throws Exception {
            this.PullServer = new PullServer(configuration);
            if (configuration.getBoolean(TajoConf.ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname, TajoConf.ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
                this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, configuration);
                this.sslFactory.init();
            }
        }

        public void destroy() {
            if (this.sslFactory != null) {
                this.sslFactory.destroy();
            }
        }

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            if (this.sslFactory != null) {
                pipeline.addLast("ssl", new SslHandler(this.sslFactory.createSSLEngine()));
            }
            pipeline.addLast("codec", new HttpServerCodec(4096, 8192, TajoPullServerService.this.getConfig().getInt(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname, TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal)));
            pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
            pipeline.addLast("chunking", new ChunkedWriteHandler());
            pipeline.addLast("shuffle", this.PullServer);
            return pipeline;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tajo/pullserver/TajoPullServerService$ProcessingStatus.class */
    public class ProcessingStatus {
        String requestUri;
        int numFiles;
        AtomicInteger remainFiles;
        long makeFileListTime;
        long maxTime;
        int numSlowFile;
        long minTime = Long.MAX_VALUE;
        long startTime = System.currentTimeMillis();

        public ProcessingStatus(String str) {
            this.requestUri = str;
        }

        public void setNumFiles(int i) {
            this.numFiles = i;
            this.remainFiles = new AtomicInteger(i);
        }

        public void decrementRemainFiles(FileRegion fileRegion, long j) {
            synchronized (this.remainFiles) {
                long currentTimeMillis = System.currentTimeMillis() - j;
                if (currentTimeMillis > 20000) {
                    TajoPullServerService.LOG.info("PullServer send too long time: filePos=" + fileRegion.getPosition() + ", fileLen=" + fileRegion.getCount());
                    this.numSlowFile++;
                }
                if (currentTimeMillis > this.maxTime) {
                    this.maxTime = currentTimeMillis;
                }
                if (currentTimeMillis < this.minTime) {
                    this.minTime = currentTimeMillis;
                }
                if (this.remainFiles.decrementAndGet() <= 0) {
                    TajoPullServerService.this.processingStatusMap.remove(this.requestUri);
                    TajoPullServerService.LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - this.startTime) + " ms, makeFileListTime=" + this.makeFileListTime + " ms, minTime=" + this.minTime + " ms, maxTime=" + this.maxTime + " ms, numFiles=" + this.numFiles + ", numSlowFile=" + this.numSlowFile);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/tajo/pullserver/TajoPullServerService$PullServer.class */
    class PullServer extends SimpleChannelUpstreamHandler {
        private final Configuration conf;
        private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.varname);
        private int port;

        public PullServer(Configuration configuration) throws IOException {
            this.conf = configuration;
            this.port = configuration.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
            this.lDirAlloc.getAllLocalPathsToRead(".", configuration);
        }

        public void setPort(int i) {
            this.port = i;
        }

        private List<String> splitMaps(List<String> list) {
            if (null == list) {
                return null;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                Collections.addAll(arrayList, it.next().split(","));
            }
            return arrayList;
        }

        public void channelOpen(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
            TajoPullServerService.this.accepted.add(channelStateEvent.getChannel());
            TajoPullServerService.LOG.info(String.format("Current number of shuffle connections (%d)", Integer.valueOf(TajoPullServerService.this.accepted.size())));
            super.channelOpen(channelHandlerContext, channelStateEvent);
        }

        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
            HttpRequest httpRequest = (HttpRequest) messageEvent.getMessage();
            if (httpRequest.getMethod() != HttpMethod.GET) {
                sendError(channelHandlerContext, HttpResponseStatus.METHOD_NOT_ALLOWED);
                return;
            }
            ProcessingStatus processingStatus = new ProcessingStatus(httpRequest.getUri().toString());
            TajoPullServerService.this.processingStatusMap.put(httpRequest.getUri().toString(), processingStatus);
            Map parameters = new QueryStringDecoder(httpRequest.getUri()).getParameters();
            List list = (List) parameters.get("type");
            List list2 = (List) parameters.get("qid");
            List<String> list3 = (List) parameters.get("ta");
            List list4 = (List) parameters.get("sid");
            List list5 = (List) parameters.get("p");
            List list6 = (List) parameters.get("offset");
            List list7 = (List) parameters.get("length");
            if (list == null || list4 == null || list2 == null || list5 == null) {
                sendError(channelHandlerContext, "Required queryId, type, stage Id, and part id", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if ((list2.size() != 1 && list.size() != 1) || list4.size() != 1) {
                sendError(channelHandlerContext, "Required qids, type, taskIds, stage Id, and part id", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            String str = (String) list5.get(0);
            String str2 = (String) list2.get(0);
            String str3 = (String) list.get(0);
            String str4 = (String) list4.get(0);
            long parseLong = (list6 == null || list6.isEmpty()) ? -1L : Long.parseLong((String) list6.get(0));
            long parseLong2 = (list7 == null || list7.isEmpty()) ? -1L : Long.parseLong((String) list7.get(0));
            if (!str3.equals("h") && !str3.equals("s") && list3 == null) {
                sendError(channelHandlerContext, "Required taskIds", HttpResponseStatus.BAD_REQUEST);
            }
            List<String> splitMaps = splitMaps(list3);
            String str5 = str2.toString() + "/output";
            if (TajoPullServerService.LOG.isDebugEnabled()) {
                TajoPullServerService.LOG.debug("PullServer request param: shuffleType=" + str3 + ", sid=" + str4 + ", partId=" + str + ", taskIds=" + list3);
                TajoPullServerService.LOG.debug("PullServer baseDir: " + this.conf.get(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + str5);
            }
            ArrayList newArrayList = Lists.newArrayList();
            if (str3.equals("r")) {
                String str6 = splitMaps.get(0);
                if (!this.lDirAlloc.ifExists(str5 + "/" + str4 + "/" + str6 + "/output/", this.conf)) {
                    TajoPullServerService.LOG.warn(messageEvent);
                    sendError(channelHandlerContext, HttpResponseStatus.NO_CONTENT);
                    return;
                }
                try {
                    FileChunk fileCunks = TajoPullServerService.getFileCunks(TajoPullServerService.this.localFS.makeQualified(this.lDirAlloc.getLocalPathToRead(str5 + "/" + str4 + "/" + str6 + "/output/", this.conf)), (String) ((List) parameters.get("start")).get(0), (String) ((List) parameters.get("end")).get(0), parameters.get("final") != null);
                    if (fileCunks != null) {
                        newArrayList.add(fileCunks);
                    }
                } catch (Throwable th) {
                    TajoPullServerService.LOG.error("ERROR Request: " + httpRequest.getUri(), th);
                    sendError(channelHandlerContext, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
                    return;
                }
            } else {
                if (!str3.equals("h") && !str3.equals("s")) {
                    TajoPullServerService.LOG.error("Unknown shuffle type: " + str3);
                    sendError(channelHandlerContext, "Unknown shuffle type:" + str3, HttpResponseStatus.BAD_REQUEST);
                    return;
                }
                String str7 = str5 + "/" + str4 + "/hash-shuffle/" + HashShuffleAppenderManager.getPartParentId(Integer.parseInt(str), this.conf) + "/" + str;
                if (!this.lDirAlloc.ifExists(str7, this.conf)) {
                    TajoPullServerService.LOG.warn("Partition shuffle file not exists: " + str7);
                    sendError(channelHandlerContext, HttpResponseStatus.NO_CONTENT);
                    return;
                }
                File file = new File(TajoPullServerService.this.localFS.makeQualified(this.lDirAlloc.getLocalPathToRead(str7, this.conf)).toUri());
                long j = (parseLong < 0 || parseLong2 < 0) ? 0L : parseLong;
                long length = (parseLong < 0 || parseLong2 < 0) ? file.length() : parseLong2;
                if (j >= file.length()) {
                    String str8 = "Start pos[" + j + "] great than file length [" + file.length() + "]";
                    TajoPullServerService.LOG.error(str8);
                    sendError(channelHandlerContext, str8, HttpResponseStatus.BAD_REQUEST);
                    return;
                }
                TajoPullServerService.LOG.info("RequestURL: " + httpRequest.getUri() + ", fileLen=" + file.length());
                newArrayList.add(new FileChunk(file, j, length));
            }
            processingStatus.setNumFiles(newArrayList.size());
            processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
            Channel channel = messageEvent.getChannel();
            if (newArrayList.size() == 0) {
                channel.write(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT));
                if (HttpHeaders.isKeepAlive(httpRequest)) {
                    return;
                }
                channel.close();
                return;
            }
            FileChunk[] fileChunkArr = (FileChunk[]) newArrayList.toArray(new FileChunk[newArrayList.size()]);
            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            long j2 = 0;
            for (FileChunk fileChunk : fileChunkArr) {
                j2 += fileChunk.length();
            }
            HttpHeaders.setContentLength(defaultHttpResponse, j2);
            channel.write(defaultHttpResponse);
            ChannelFuture channelFuture = null;
            for (FileChunk fileChunk2 : fileChunkArr) {
                channelFuture = sendFile(channelHandlerContext, channel, fileChunk2, httpRequest.getUri().toString());
                if (channelFuture == null) {
                    sendError(channelHandlerContext, HttpResponseStatus.NOT_FOUND);
                    return;
                }
            }
            if (HttpHeaders.isKeepAlive(httpRequest)) {
                return;
            }
            channelFuture.addListener(ChannelFutureListener.CLOSE);
        }

        private ChannelFuture sendFile(ChannelHandlerContext channelHandlerContext, Channel channel, FileChunk fileChunk, String str) throws IOException {
            ChannelFuture write;
            long currentTimeMillis = System.currentTimeMillis();
            RandomAccessFile randomAccessFile = null;
            try {
                RandomAccessFile randomAccessFile2 = new RandomAccessFile(fileChunk.getFile(), "r");
                if (channel.getPipeline().get(SslHandler.class) == null) {
                    FadvisedFileRegion fadvisedFileRegion = new FadvisedFileRegion(randomAccessFile2, fileChunk.startOffset(), fileChunk.length(), TajoPullServerService.this.manageOsCache, TajoPullServerService.this.readaheadLength, TajoPullServerService.this.readaheadPool, fileChunk.getFile().getAbsolutePath());
                    write = channel.write(fadvisedFileRegion);
                    write.addListener(new FileCloseListener(fadvisedFileRegion, str, currentTimeMillis, TajoPullServerService.this));
                } else {
                    write = channel.write(new FadvisedChunkedFile(randomAccessFile2, fileChunk.startOffset(), fileChunk.length(), TajoPullServerService.this.sslFileBufferSize, TajoPullServerService.this.manageOsCache, TajoPullServerService.this.readaheadLength, TajoPullServerService.this.readaheadPool, fileChunk.getFile().getAbsolutePath()));
                }
                TajoPullServerService.this.metrics.shuffleConnections.incr();
                TajoPullServerService.this.metrics.shuffleOutputBytes.incr(fileChunk.length());
                return write;
            } catch (FileNotFoundException e) {
                TajoPullServerService.LOG.info(fileChunk.getFile() + " not found");
                return null;
            } catch (Throwable th) {
                if (0 == 0) {
                    return null;
                }
                randomAccessFile.close();
                return null;
            }
        }

        private void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
            sendError(channelHandlerContext, "", httpResponseStatus);
        }

        private void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus);
            defaultHttpResponse.setHeader("Content-Type", "text/plain; charset=UTF-8");
            defaultHttpResponse.setContent(ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8));
            channelHandlerContext.getChannel().write(defaultHttpResponse).addListener(ChannelFutureListener.CLOSE);
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
            TajoPullServerService.LOG.error(exceptionEvent.getCause().getMessage(), exceptionEvent.getCause());
            if (channelHandlerContext.getChannel().isConnected()) {
                channelHandlerContext.getChannel().close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Metrics(name = "PullServerShuffleMetrics", about = "PullServer output metrics", context = "tajo")
    /* loaded from: input_file:org/apache/tajo/pullserver/TajoPullServerService$ShuffleMetrics.class */
    public static class ShuffleMetrics implements ChannelFutureListener {

        @Metric({"OutputBytes", "PullServer output in bytes"})
        MutableCounterLong shuffleOutputBytes;

        @Metric({"Failed", "# of failed shuffle outputs"})
        MutableCounterInt shuffleOutputsFailed;

        @Metric({"Succeeded", "# of succeeded shuffle outputs"})
        MutableCounterInt shuffleOutputsOK;

        @Metric({"Connections", "# of current shuffle connections"})
        MutableGaugeInt shuffleConnections;

        ShuffleMetrics() {
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                this.shuffleOutputsOK.incr();
            } else {
                this.shuffleOutputsFailed.incr();
            }
            this.shuffleConnections.decr();
        }
    }

    TajoPullServerService(MetricsSystem metricsSystem) {
        super("httpshuffle");
        this.accepted = new DefaultChannelGroup();
        this.readaheadPool = ReadaheadPool.getInstance();
        this.processingStatusMap = new ConcurrentHashMap();
        this.metrics = (ShuffleMetrics) metricsSystem.register(new ShuffleMetrics());
    }

    public TajoPullServerService() {
        this(DefaultMetricsSystem.instance());
    }

    public static ByteBuffer serializeMetaData(int i) throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        dataOutputBuffer.writeInt(i);
        return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    public static int deserializeMetaData(ByteBuffer byteBuffer) throws IOException {
        DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
        dataInputByteBuffer.reset(new ByteBuffer[]{byteBuffer});
        return dataInputByteBuffer.readInt();
    }

    public void initApp(String str, ApplicationId applicationId, ByteBuffer byteBuffer) {
        this.appId = applicationId;
        this.userName = str;
        userRsrc.put(applicationId.toString(), str);
    }

    public void stopApp(ApplicationId applicationId) {
        userRsrc.remove(applicationId.toString());
    }

    public void init(Configuration configuration) {
        try {
            this.manageOsCache = configuration.getBoolean("tajo.pullserver.manage.os.cache", true);
            this.readaheadLength = configuration.getInt("tajo.pullserver.readahead.bytes", 4194304);
            this.selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", configuration.getInt("tajo.shuffle.rpc.server.worker-thread-num", Runtime.getRuntime().availableProcessors() * 2));
            this.localFS = new LocalFileSystem();
            configuration.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
            super.init(configuration);
            LOG.info("Tajo PullServer initialized: readaheadLength=" + this.readaheadLength);
        } catch (Throwable th) {
            LOG.error(th);
        }
    }

    public synchronized void serviceInit(Configuration configuration) throws Exception {
        ServerBootstrap serverBootstrap = new ServerBootstrap(this.selector);
        try {
            this.pipelineFact = new HttpPipelineFactory(configuration);
            serverBootstrap.setPipelineFactory(this.pipelineFact);
            this.port = configuration.getInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
            Channel bind = serverBootstrap.bind(new InetSocketAddress(this.port));
            this.accepted.add(bind);
            this.port = ((InetSocketAddress) bind.getLocalAddress()).getPort();
            configuration.set(TajoConf.ConfVars.PULLSERVER_PORT.varname, Integer.toString(this.port));
            this.pipelineFact.PullServer.setPort(this.port);
            LOG.info(getName() + " listening on port " + this.port);
            this.sslFileBufferSize = configuration.getInt("tajo.pullserver.ssl.file.buffer.size", 61440);
            if (STANDALONE) {
                File pullServerPortFile = getPullServerPortFile();
                if (pullServerPortFile.exists()) {
                    pullServerPortFile.delete();
                }
                pullServerPortFile.getParentFile().mkdirs();
                LOG.info("Write PullServerPort to " + pullServerPortFile);
                FileOutputStream fileOutputStream = null;
                try {
                    try {
                        fileOutputStream = new FileOutputStream(pullServerPortFile);
                        fileOutputStream.write(("" + this.port).getBytes());
                        IOUtils.closeStream(fileOutputStream);
                    } catch (Exception e) {
                        LOG.fatal("PullServer exists cause can't write PullServer port to " + pullServerPortFile + ", " + e.getMessage(), e);
                        System.exit(-1);
                        IOUtils.closeStream(fileOutputStream);
                    }
                } catch (Throwable th) {
                    IOUtils.closeStream(fileOutputStream);
                    throw th;
                }
            }
            super.serviceInit(configuration);
            LOG.info("TajoPullServerService started: port=" + this.port);
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    public static boolean isStandalone() {
        return STANDALONE;
    }

    private static File getPullServerPortFile() {
        String str = System.getenv("TAJO_PID_DIR");
        if (StringUtils.isEmpty(str)) {
            str = "/tmp";
        }
        return new File(str + "/pullserver.port");
    }

    public static int readPullServerPort() {
        FileInputStream fileInputStream = null;
        try {
            try {
                File pullServerPortFile = getPullServerPortFile();
                if (!pullServerPortFile.exists() || pullServerPortFile.isDirectory()) {
                    IOUtils.closeStream((Closeable) null);
                    return -1;
                }
                fileInputStream = new FileInputStream(pullServerPortFile);
                byte[] bArr = new byte[1024];
                int parseInt = Integer.parseInt(new String(bArr, 0, fileInputStream.read(bArr)));
                IOUtils.closeStream(fileInputStream);
                return parseInt;
            } catch (IOException e) {
                LOG.fatal(e.getMessage(), e);
                IOUtils.closeStream(fileInputStream);
                return -1;
            }
        } catch (Throwable th) {
            IOUtils.closeStream(fileInputStream);
            throw th;
        }
    }

    public int getPort() {
        return this.port;
    }

    public synchronized void stop() {
        try {
            try {
                this.accepted.close().awaitUninterruptibly(10L, TimeUnit.SECONDS);
                new ServerBootstrap(this.selector).releaseExternalResources();
                this.pipelineFact.destroy();
                this.localFS.close();
                super.stop();
            } catch (Throwable th) {
                LOG.error(th);
                super.stop();
            }
        } catch (Throwable th2) {
            super.stop();
            throw th2;
        }
    }

    public synchronized ByteBuffer getMeta() {
        try {
            return serializeMetaData(this.port);
        } catch (IOException e) {
            LOG.error("Error during getMeta", e);
            return null;
        }
    }

    public void completeFileChunk(FileRegion fileRegion, String str, long j) {
        ProcessingStatus processingStatus = this.processingStatusMap.get(str);
        if (processingStatus != null) {
            processingStatus.decrementRemainFiles(fileRegion, j);
        }
    }

    public static FileChunk getFileCunks(Path path, String str, String str2, boolean z) throws IOException {
        BSTIndex.BSTIndexReader indexReader = new BSTIndex(new TajoConf()).getIndexReader(new Path(path, "index"));
        indexReader.open();
        Schema keySchema = indexReader.getKeySchema();
        TupleComparator comparator = indexReader.getComparator();
        LOG.info("BSTIndex is loaded from disk (" + indexReader.getFirstKey() + ", " + indexReader.getLastKey());
        File file = new File(URI.create(path.toUri() + "/output"));
        byte[] decodeBase64 = Base64.decodeBase64(str);
        byte[] decodeBase642 = Base64.decodeBase64(str2);
        RowStoreUtil.RowStoreDecoder createDecoder = RowStoreUtil.createDecoder(keySchema);
        try {
            Tuple tuple = createDecoder.toTuple(decodeBase64);
            try {
                Tuple tuple2 = createDecoder.toTuple(decodeBase642);
                LOG.info("GET Request for " + file.getAbsolutePath() + " (start=" + tuple + ", end=" + tuple2 + (z ? ", last=true" : "") + ")");
                if (indexReader.getFirstKey() == null && indexReader.getLastKey() == null) {
                    LOG.info("There is no contents");
                    return null;
                }
                if (comparator.compare(tuple2, indexReader.getFirstKey()) < 0 || comparator.compare(indexReader.getLastKey(), tuple) < 0) {
                    LOG.warn("Out of Scope (indexed data [" + indexReader.getFirstKey() + ", " + indexReader.getLastKey() + "], but request start:" + tuple + ", end: " + tuple2);
                    return null;
                }
                try {
                    long find = indexReader.find(tuple);
                    try {
                        long find2 = indexReader.find(tuple2);
                        if (find2 == -1) {
                            find2 = indexReader.find(tuple2, true);
                        }
                        if (find == -1) {
                            try {
                                find = indexReader.find(tuple, true);
                            } catch (IOException e) {
                                LOG.error("State Dump (the requested range: [" + tuple + ", " + tuple2 + "), idx min: " + indexReader.getFirstKey() + ", idx max: " + indexReader.getLastKey());
                                throw e;
                            }
                        }
                        if (find == -1) {
                            throw new IllegalStateException("startOffset " + find + " is negative \nState Dump (the requested range: [" + tuple + ", " + tuple2 + "), idx min: " + indexReader.getFirstKey() + ", idx max: " + indexReader.getLastKey());
                        }
                        if (z || (find2 == -1 && comparator.compare(indexReader.getLastKey(), tuple2) < 0)) {
                            find2 = file.length();
                        }
                        indexReader.close();
                        FileChunk fileChunk = new FileChunk(file, find, find2 - find);
                        LOG.info("Retrieve File Chunk: " + fileChunk);
                        return fileChunk;
                    } catch (IOException e2) {
                        LOG.error("State Dump (the requested range: [" + tuple + ", " + tuple2 + "), idx min: " + indexReader.getFirstKey() + ", idx max: " + indexReader.getLastKey());
                        throw e2;
                    }
                } catch (IOException e3) {
                    LOG.error("State Dump (the requested range: [" + tuple + ", " + tuple2 + "), idx min: " + indexReader.getFirstKey() + ", idx max: " + indexReader.getLastKey());
                    throw e3;
                }
            } catch (Throwable th) {
                throw new IllegalArgumentException("EndKey: " + str2 + ", decoded byte size: " + decodeBase642.length, th);
            }
        } catch (Throwable th2) {
            throw new IllegalArgumentException("StartKey: " + str + ", decoded byte size: " + decodeBase64.length, th2);
        }
    }

    static {
        STANDALONE = false;
        String str = System.getenv("TAJO_PULLSERVER_STANDALONE");
        if (StringUtils.isEmpty(str)) {
            return;
        }
        STANDALONE = str.equalsIgnoreCase("true");
    }
}
