package org.apache.twill.internal;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.MessageCallback;
import org.apache.twill.internal.state.MessageCodec;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/internal/AbstractTwillService.class */
public abstract class AbstractTwillService extends AbstractExecutionThreadService implements MessageCallback {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillService.class);
    private static final Gson GSON = new Gson();
    protected final ZKClient zkClient;
    protected final RunId runId;
    private ExecutorService messageCallbackExecutor;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTwillService(ZKClient zKClient, RunId runId) {
        this.zkClient = zKClient;
        this.runId = runId;
    }

    protected void doStart() throws Exception {
    }

    protected void doRun() throws Exception {
    }

    protected void doStop() throws Exception {
    }

    protected Object getLiveNodeData() {
        return null;
    }

    @Override // org.apache.twill.internal.state.MessageCallback
    public ListenableFuture<String> onReceived(String str, Message message) {
        LOG.info("Message received: {}", message);
        return Futures.immediateCheckedFuture(str);
    }

    @Override // com.google.common.util.concurrent.AbstractExecutionThreadService
    protected final void startUp() throws Exception {
        this.messageCallbackExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), Threads.createDaemonThreadFactory("message-callback"), new ThreadPoolExecutor.DiscardPolicy());
        createLiveNode().get();
        ZKOperations.ignoreError(this.zkClient.create(getZKPath("messages"), null, CreateMode.PERSISTENT), KeeperException.NodeExistsException.class, null).get();
        this.zkClient.addConnectionWatcher(new Watcher() { // from class: org.apache.twill.internal.AbstractTwillService.1
            private boolean expired = false;

            @Override // org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) {
                    AbstractTwillService.LOG.warn("ZK Session expired for service {} with runId {}.", AbstractTwillService.this.getServiceName(), AbstractTwillService.this.runId.getId());
                    this.expired = true;
                } else if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected && this.expired) {
                    AbstractTwillService.LOG.info("Reconnected after expiration for service {} with runId {}", AbstractTwillService.this.getServiceName(), AbstractTwillService.this.runId.getId());
                    this.expired = false;
                    AbstractTwillService.this.logIfFailed(AbstractTwillService.this.createLiveNode());
                }
            }
        });
        doStart();
        watchMessages();
    }

    @Override // com.google.common.util.concurrent.AbstractExecutionThreadService
    protected final void run() throws Exception {
        doRun();
    }

    @Override // com.google.common.util.concurrent.AbstractExecutionThreadService
    protected final void shutDown() throws Exception {
        this.messageCallbackExecutor.shutdownNow();
        try {
            doStop();
            Futures.successfulAsList(ImmutableList.of(removeServiceNode(), removeLiveNode())).get(5L, TimeUnit.SECONDS);
            LOG.info("Service {} with runId {} shutdown completed", getServiceName(), this.runId.getId());
        } catch (Throwable th) {
            Futures.successfulAsList(ImmutableList.of(removeServiceNode(), removeLiveNode())).get(5L, TimeUnit.SECONDS);
            LOG.info("Service {} with runId {} shutdown completed", getServiceName(), this.runId.getId());
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OperationFuture<String> createLiveNode() {
        String liveNodePath = getLiveNodePath();
        LOG.info("Create live node {}{}", this.zkClient.getConnectString(), liveNodePath);
        JsonObject jsonObject = new JsonObject();
        Object liveNodeData = getLiveNodeData();
        if (liveNodeData != null) {
            jsonObject.add("data", GSON.toJsonTree(liveNodeData));
        }
        return ZKOperations.ignoreError(this.zkClient.create(liveNodePath, toJson(jsonObject), CreateMode.EPHEMERAL), KeeperException.NodeExistsException.class, liveNodePath);
    }

    private OperationFuture<String> removeLiveNode() {
        String liveNodePath = getLiveNodePath();
        LOG.info("Remove live node {}{}", this.zkClient.getConnectString(), liveNodePath);
        return ZKOperations.ignoreError(this.zkClient.delete(liveNodePath), KeeperException.NoNodeException.class, liveNodePath);
    }

    private OperationFuture<String> removeServiceNode() {
        String format = String.format("/%s", this.runId.getId());
        LOG.info("Remove service node {}{}", this.zkClient.getConnectString(), format);
        return ZKOperations.recursiveDelete(this.zkClient, format);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void watchMessages() {
        final String zKPath = getZKPath("messages");
        Futures.addCallback(this.zkClient.getChildren(zKPath, new Watcher() { // from class: org.apache.twill.internal.AbstractTwillService.2
            @Override // org.apache.zookeeper.Watcher
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged && AbstractTwillService.this.isRunning()) {
                    AbstractTwillService.this.watchMessages();
                }
            }
        }), new FutureCallback<NodeChildren>() { // from class: org.apache.twill.internal.AbstractTwillService.3
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(NodeChildren nodeChildren) {
                ArrayList<String> newArrayList = Lists.newArrayList(nodeChildren.getChildren());
                Collections.sort(newArrayList);
                for (String str : newArrayList) {
                    AbstractTwillService.this.processMessage(zKPath + "/" + str, str);
                }
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                AbstractTwillService.LOG.error("Failed to watch messages.", th);
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processMessage(final String str, final String str2) {
        Futures.addCallback(this.zkClient.getData(str), new FutureCallback<NodeData>() { // from class: org.apache.twill.internal.AbstractTwillService.4
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(NodeData nodeData) {
                Runnable createMessageRemover = AbstractTwillService.this.createMessageRemover(str, nodeData.getStat().getVersion());
                Message decode = MessageCodec.decode(nodeData.getData());
                if (decode == null) {
                    AbstractTwillService.LOG.error("Failed to decode message for {} in {}", str2, str);
                    createMessageRemover.run();
                    return;
                }
                if (AbstractTwillService.LOG.isDebugEnabled()) {
                    AbstractTwillService.LOG.debug("Message received from {}: {}", str, new String(MessageCodec.encode(decode), Charsets.UTF_8));
                }
                if (AbstractTwillService.this.handleStopMessage(decode, createMessageRemover)) {
                    return;
                }
                AbstractTwillService.this.handleMessage(str2, decode, createMessageRemover);
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                AbstractTwillService.LOG.error("Failed to fetch message content from {}", str, th);
            }
        }, this.messageCallbackExecutor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean handleStopMessage(Message message, final Runnable runnable) {
        if (message.getType() != Message.Type.SYSTEM || !SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
            return false;
        }
        Futures.addCallback(stop(), new FutureCallback<Service.State>() { // from class: org.apache.twill.internal.AbstractTwillService.5
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(Service.State state) {
                runnable.run();
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                AbstractTwillService.LOG.error("Stop service failed upon STOP command", th);
                runnable.run();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleMessage(String str, final Message message, final Runnable runnable) {
        Futures.addCallback(onReceived(str, message), new FutureCallback<String>() { // from class: org.apache.twill.internal.AbstractTwillService.6
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(String str2) {
                runnable.run();
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                AbstractTwillService.LOG.error("Failed to handle message {}", message, th);
                runnable.run();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Runnable createMessageRemover(final String str, final int i) {
        return new Runnable() { // from class: org.apache.twill.internal.AbstractTwillService.7
            @Override // java.lang.Runnable
            public void run() {
                AbstractTwillService.this.logIfFailed(AbstractTwillService.this.zkClient.delete(str, i));
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> void logIfFailed(ListenableFuture<T> listenableFuture) {
        Futures.addCallback(listenableFuture, new FutureCallback<T>() { // from class: org.apache.twill.internal.AbstractTwillService.8
            @Override // com.google.common.util.concurrent.FutureCallback
            public void onSuccess(T t) {
            }

            @Override // com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                AbstractTwillService.LOG.error("Operation failed for service {} with runId {}", new Object[]{AbstractTwillService.this.getServiceName(), AbstractTwillService.this.runId, th});
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }

    private String getZKPath(String str) {
        return String.format("/%s/%s", this.runId.getId(), str);
    }

    private String getLiveNodePath() {
        return "/instances/" + this.runId.getId();
    }

    private <T> byte[] toJson(T t) {
        return GSON.toJson(t).getBytes(Charsets.UTF_8);
    }
}
