/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.ext;

import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapTaskUmbilicalExternalClient
extends AbstractService
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
    private final LlapProtocolClientProxy communicator;
    private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
    private final Configuration conf;
    private final LlapTaskUmbilicalProtocol umbilical;
    protected final String tokenIdentifier;
    protected final Token<JobTokenIdentifier> sessionToken;
    private final ConcurrentMap<String, PendingEventData> pendingEvents = new ConcurrentHashMap<String, PendingEventData>();
    private final ConcurrentMap<String, TaskHeartbeatInfo> registeredTasks = new ConcurrentHashMap<String, TaskHeartbeatInfo>();
    private LlapTaskUmbilicalExternalResponder responder = null;
    private final ScheduledThreadPoolExecutor timer;
    private final long connectionTimeout;

    public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder, Token<LlapTokenIdentifier> llapToken) {
        super(LlapTaskUmbilicalExternalClient.class.getName());
        this.conf = conf;
        this.umbilical = new LlapTaskUmbilicalExternalImpl();
        this.tokenIdentifier = tokenIdentifier;
        this.sessionToken = sessionToken;
        this.responder = responder;
        this.timer = new ScheduledThreadPoolExecutor(1);
        this.connectionTimeout = 3L * HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        this.communicator = new LlapProtocolClientProxy(1, conf, llapToken);
        this.communicator.init(conf);
    }

    public void serviceStart() throws IOException {
        int numHandlers = 1;
        this.llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(this.conf, this.umbilical, numHandlers, this.tokenIdentifier, this.sessionToken);
        this.communicator.start();
    }

    public void serviceStop() {
        this.llapTaskUmbilicalServer.shutdownServer();
        this.timer.shutdown();
        if (this.communicator != null) {
            this.communicator.stop();
        }
    }

    public InetSocketAddress getAddress() {
        return this.llapTaskUmbilicalServer.getAddress();
    }

    public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, String llapHost, int llapPort) {
        LlapDaemonProtocolProtos.VertexOrBinary vob = request.getWorkSpec();
        assert (vob.hasVertexBinary() != vob.hasVertex());
        LlapDaemonProtocolProtos.SignableVertexSpec vertex = null;
        try {
            vertex = vob.hasVertex() ? vob.getVertex() : LlapDaemonProtocolProtos.SignableVertexSpec.parseFrom(vob.getVertexBinary());
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
        LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifierProto = vertex.getQueryIdentifier();
        TezTaskAttemptID attemptId = Converters.createTaskAttemptId(queryIdentifierProto, vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber());
        final String fragmentId = attemptId.toString();
        final TaskHeartbeatInfo thi = new TaskHeartbeatInfo(fragmentId, llapHost, llapPort);
        this.pendingEvents.putIfAbsent(fragmentId, new PendingEventData(thi, Lists.newArrayList()));
        this.timer.scheduleAtFixedRate(new HeartbeatCheckTask(), this.connectionTimeout, this.connectionTimeout, TimeUnit.MILLISECONDS);
        this.communicator.sendSubmitWork(request, llapHost, llapPort, new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>(){

            @Override
            public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
                if (response.hasSubmissionState() && response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
                    String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
                    LOG.info(msg);
                    if (LlapTaskUmbilicalExternalClient.this.responder != null) {
                        RuntimeException err = new RuntimeException(msg);
                        LlapTaskUmbilicalExternalClient.this.responder.submissionFailed(fragmentId, err);
                    }
                    return;
                }
                if (response.hasUniqueNodeId()) {
                    thi.uniqueNodeId = response.getUniqueNodeId();
                }
            }

            @Override
            public void indicateError(Throwable t) {
                String msg = "Failed to submit: " + fragmentId;
                LOG.error(msg, t);
                RuntimeException err = new RuntimeException(msg, t);
                LlapTaskUmbilicalExternalClient.this.responder.submissionFailed(fragmentId, err);
            }
        });
    }

    private void updateHeartbeatInfo(String taskAttemptId) {
        TaskHeartbeatInfo heartbeatInfo;
        int updateCount = 0;
        PendingEventData pendingEventData = (PendingEventData)this.pendingEvents.get(taskAttemptId);
        if (pendingEventData != null) {
            pendingEventData.heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
            ++updateCount;
        }
        if ((heartbeatInfo = (TaskHeartbeatInfo)this.registeredTasks.get(taskAttemptId)) != null) {
            heartbeatInfo.lastHeartbeat.set(System.currentTimeMillis());
            ++updateCount;
        }
        if (updateCount == 0) {
            LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
        }
    }

    private void updateHeartbeatInfo(String hostname, String uniqueId, int port) {
        int updateCount = 0;
        for (String key : this.pendingEvents.keySet()) {
            PendingEventData pendingEventData = (PendingEventData)this.pendingEvents.get(key);
            if (pendingEventData == null) continue;
            TaskHeartbeatInfo thi = pendingEventData.heartbeatInfo;
            String thiUniqueId = thi.uniqueNodeId;
            if (!thi.hostname.equals(hostname) || thi.port != port || thiUniqueId == null || !thiUniqueId.equals(uniqueId)) continue;
            thi.lastHeartbeat.set(System.currentTimeMillis());
            ++updateCount;
        }
        for (String key : this.registeredTasks.keySet()) {
            TaskHeartbeatInfo thi = (TaskHeartbeatInfo)this.registeredTasks.get(key);
            if (thi == null) continue;
            String thiUniqueId = thi.uniqueNodeId;
            if (!thi.hostname.equals(hostname) || thi.port != port || thiUniqueId == null || !thiUniqueId.equals(uniqueId)) continue;
            thi.lastHeartbeat.set(System.currentTimeMillis());
            ++updateCount;
        }
        if (updateCount == 0) {
            LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
        }
    }

    private class LlapTaskUmbilicalExternalImpl
    implements LlapTaskUmbilicalProtocol {
        private LlapTaskUmbilicalExternalImpl() {
        }

        @Override
        public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
            return true;
        }

        @Override
        public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received heartbeat from container, request=" + request);
            }
            TezHeartbeatResponse response = new TezHeartbeatResponse();
            response.setLastRequestId(request.getRequestId());
            TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
            String taskAttemptIdString = taskAttemptId.toString();
            LlapTaskUmbilicalExternalClient.this.updateHeartbeatInfo(taskAttemptIdString);
            List<Object> tezEvents = null;
            PendingEventData pendingEventData = (PendingEventData)LlapTaskUmbilicalExternalClient.this.pendingEvents.remove(taskAttemptIdString);
            if (pendingEventData == null) {
                tezEvents = Collections.emptyList();
                if (!LlapTaskUmbilicalExternalClient.this.registeredTasks.containsKey(taskAttemptIdString)) {
                    LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
                    response.setShouldDie();
                    return response;
                }
            } else {
                tezEvents = pendingEventData.tezEvents;
                LlapTaskUmbilicalExternalClient.this.registeredTasks.put(taskAttemptIdString, pendingEventData.heartbeatInfo);
            }
            response.setLastRequestId(request.getRequestId());
            response.setNextFromEventId(0);
            response.setNextPreRoutedEventId(0);
            response.setEvents(tezEvents);
            List inEvents = request.getEvents();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Heartbeat from " + taskAttemptIdString + " events: " + (inEvents != null ? inEvents.size() : -1));
            }
            block7: for (TezEvent tezEvent : ListUtils.emptyIfNull((List)inEvents)) {
                EventType eventType = tezEvent.getEventType();
                switch (eventType) {
                    case TASK_ATTEMPT_COMPLETED_EVENT: {
                        LOG.debug("Task completed event for " + taskAttemptIdString);
                        LlapTaskUmbilicalExternalClient.this.registeredTasks.remove(taskAttemptIdString);
                        continue block7;
                    }
                    case TASK_ATTEMPT_FAILED_EVENT: {
                        LOG.debug("Task failed event for " + taskAttemptIdString);
                        LlapTaskUmbilicalExternalClient.this.registeredTasks.remove(taskAttemptIdString);
                        continue block7;
                    }
                    case TASK_STATUS_UPDATE_EVENT: {
                        LOG.debug("Task update event for " + taskAttemptIdString);
                        continue block7;
                    }
                }
                LOG.warn("Unhandled event type " + eventType);
            }
            try {
                if (LlapTaskUmbilicalExternalClient.this.responder != null) {
                    LlapTaskUmbilicalExternalClient.this.responder.heartbeat(request);
                }
            }
            catch (Exception err) {
                LOG.error("Error during responder execution", (Throwable)err);
            }
            return response;
        }

        @Override
        public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException {
            LlapTaskUmbilicalExternalClient.this.updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port);
        }

        @Override
        public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
            String taskAttemptIdString = taskAttemptId.toString();
            LOG.error("Task killed - " + taskAttemptIdString);
            LlapTaskUmbilicalExternalClient.this.registeredTasks.remove(taskAttemptIdString);
            try {
                if (LlapTaskUmbilicalExternalClient.this.responder != null) {
                    LlapTaskUmbilicalExternalClient.this.responder.taskKilled(taskAttemptId);
                }
            }
            catch (Exception err) {
                LOG.error("Error during responder execution", (Throwable)err);
            }
        }

        public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
            return 0L;
        }

        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
            return ProtocolSignature.getProtocolSignature((VersionedProtocol)this, (String)protocol, (long)clientVersion, (int)clientMethodsHash);
        }
    }

    public static interface LlapTaskUmbilicalExternalResponder {
        public void submissionFailed(String var1, Throwable var2);

        public void heartbeat(TezHeartbeatRequest var1);

        public void taskKilled(TezTaskAttemptID var1);

        public void heartbeatTimeout(String var1);
    }

    private class HeartbeatCheckTask
    implements Runnable {
        private HeartbeatCheckTask() {
        }

        @Override
        public void run() {
            long currentTime = System.currentTimeMillis();
            ArrayList<String> timedOutTasks = new ArrayList<String>();
            for (String key : LlapTaskUmbilicalExternalClient.this.pendingEvents.keySet()) {
                PendingEventData pendingEventData = (PendingEventData)LlapTaskUmbilicalExternalClient.this.pendingEvents.get(key);
                if (pendingEventData == null || currentTime - pendingEventData.heartbeatInfo.lastHeartbeat.get() < LlapTaskUmbilicalExternalClient.this.connectionTimeout) continue;
                timedOutTasks.add(key);
            }
            for (String timedOutTask : timedOutTasks) {
                LOG.info("Pending taskAttemptId " + timedOutTask + " timed out");
                LlapTaskUmbilicalExternalClient.this.responder.heartbeatTimeout(timedOutTask);
                LlapTaskUmbilicalExternalClient.this.pendingEvents.remove(timedOutTask);
            }
            timedOutTasks.clear();
            for (String key : LlapTaskUmbilicalExternalClient.this.registeredTasks.keySet()) {
                TaskHeartbeatInfo heartbeatInfo = (TaskHeartbeatInfo)LlapTaskUmbilicalExternalClient.this.registeredTasks.get(key);
                if (heartbeatInfo == null || currentTime - heartbeatInfo.lastHeartbeat.get() < LlapTaskUmbilicalExternalClient.this.connectionTimeout) continue;
                timedOutTasks.add(key);
            }
            for (String timedOutTask : timedOutTasks) {
                LOG.info("Running taskAttemptId " + timedOutTask + " timed out");
                LlapTaskUmbilicalExternalClient.this.responder.heartbeatTimeout(timedOutTask);
                LlapTaskUmbilicalExternalClient.this.registeredTasks.remove(timedOutTask);
            }
        }
    }

    private static class PendingEventData {
        final TaskHeartbeatInfo heartbeatInfo;
        final List<TezEvent> tezEvents;

        public PendingEventData(TaskHeartbeatInfo heartbeatInfo, List<TezEvent> tezEvents) {
            this.heartbeatInfo = heartbeatInfo;
            this.tezEvents = tezEvents;
        }
    }

    private static class TaskHeartbeatInfo {
        final String taskAttemptId;
        final String hostname;
        String uniqueNodeId;
        final int port;
        final AtomicLong lastHeartbeat = new AtomicLong();

        public TaskHeartbeatInfo(String taskAttemptId, String hostname, int port) {
            this.taskAttemptId = taskAttemptId;
            this.hostname = hostname;
            this.port = port;
            this.lastHeartbeat.set(System.currentTimeMillis());
        }
    }
}

