package org.apache.twill.internal;

import com.google.common.base.Charsets;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.twill.api.RunId;
import org.apache.twill.common.ServiceListenerAdapter;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.json.StackTraceElementCodec;
import org.apache.twill.internal.json.StateNodeCodec;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.MessageCallback;
import org.apache.twill.internal.state.MessageCodec;
import org.apache.twill.internal.state.StateNode;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/internal/ZKServiceDecorator.class */
public final class ZKServiceDecorator extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(ZKServiceDecorator.class);
    private final ZKClient zkClient;
    private final RunId id;
    private final Supplier<? extends JsonElement> liveNodeData;
    private final Service decoratedService;
    private final MessageCallbackCaller messageCallback;
    private ExecutorService callbackExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/twill/internal/ZKServiceDecorator$DecoratedServiceListener.class */
    public final class DecoratedServiceListener implements Service.Listener {
        private volatile boolean zkFailure;

        private DecoratedServiceListener() {
            this.zkFailure = false;
        }

        public void starting() {
            ZKServiceDecorator.LOG.info("Starting: " + ZKServiceDecorator.this.id);
            saveState(Service.State.STARTING);
        }

        public void running() {
            ZKServiceDecorator.LOG.info("Running: " + ZKServiceDecorator.this.id);
            ZKServiceDecorator.this.notifyStarted();
            ZKServiceDecorator.this.watchMessages();
            saveState(Service.State.RUNNING);
        }

        public void stopping(Service.State state) {
            ZKServiceDecorator.LOG.info("Stopping: " + ZKServiceDecorator.this.id);
            saveState(Service.State.STOPPING);
        }

        public void terminated(Service.State state) {
            ZKServiceDecorator.LOG.info("Terminated: " + state + " " + ZKServiceDecorator.this.id);
            if (this.zkFailure) {
                return;
            }
            ImmutableList of = ImmutableList.of(ZKServiceDecorator.this.removeLiveNode(), ZKServiceDecorator.this.removeServiceNode());
            final ListenableFuture allAsList = Futures.allAsList(of);
            Futures.successfulAsList(of).addListener(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.DecoratedServiceListener.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        allAsList.get();
                        ZKServiceDecorator.LOG.info("Service and state node removed");
                        ZKServiceDecorator.this.notifyStopped();
                    } catch (Exception e) {
                        ZKServiceDecorator.LOG.warn("Failed to remove ZK nodes.", e);
                        ZKServiceDecorator.this.notifyFailed(e);
                    }
                }
            }, Threads.SAME_THREAD_EXECUTOR);
        }

        public void failed(Service.State state, final Throwable th) {
            ZKServiceDecorator.LOG.info("Failed: {} {}.", new Object[]{state, ZKServiceDecorator.this.id, th});
            if (this.zkFailure) {
                return;
            }
            Futures.successfulAsList(ImmutableList.of(ZKServiceDecorator.this.removeLiveNode(), ZKServiceDecorator.this.removeServiceNode())).addListener(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.DecoratedServiceListener.2
                @Override // java.lang.Runnable
                public void run() {
                    ZKServiceDecorator.LOG.info("Service and state node removed");
                    ZKServiceDecorator.this.notifyFailed(th);
                }
            }, Threads.SAME_THREAD_EXECUTOR);
        }

        private void saveState(Service.State state) {
            if (this.zkFailure) {
                return;
            }
            stopOnFailure(ZKServiceDecorator.this.zkClient.setData(ZKServiceDecorator.this.getZKPath("state"), ZKServiceDecorator.this.encodeStateNode(new StateNode(state))));
        }

        private <V> void stopOnFailure(final OperationFuture<V> operationFuture) {
            operationFuture.addListener(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.DecoratedServiceListener.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        operationFuture.get();
                    } catch (Exception e) {
                        ZKServiceDecorator.LOG.error("ZK operation failed", e);
                        DecoratedServiceListener.this.zkFailure = true;
                        ZKServiceDecorator.this.decoratedService.stop().addListener(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.DecoratedServiceListener.3.1
                            @Override // java.lang.Runnable
                            public void run() {
                                ZKServiceDecorator.this.notifyFailed(e);
                            }
                        }, Threads.SAME_THREAD_EXECUTOR);
                    }
                }
            }, Threads.SAME_THREAD_EXECUTOR);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/twill/internal/ZKServiceDecorator$MessageCallbackCaller.class */
    public static final class MessageCallbackCaller {
        private final MessageCallback callback;
        private final ZKClient zkClient;

        private MessageCallbackCaller(ZKClient zKClient) {
            this((MessageCallback) null, zKClient);
        }

        private MessageCallbackCaller(MessageCallback messageCallback, ZKClient zKClient) {
            this.callback = messageCallback;
            this.zkClient = zKClient;
        }

        public void onReceived(Executor executor, final String str, final int i, final String str2, final Message message) {
            if (this.callback != null) {
                executor.execute(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.MessageCallbackCaller.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            try {
                                MessageCallbackCaller.this.callback.onReceived(str2, message).get();
                                ZKServiceDecorator.listenFailure(MessageCallbackCaller.this.zkClient.delete(str, i));
                            } catch (Throwable th) {
                                ZKServiceDecorator.LOG.error("Exception when processing message: {}, {}, {}", new Object[]{str2, message, str, th});
                                ZKServiceDecorator.listenFailure(MessageCallbackCaller.this.zkClient.delete(str, i));
                            }
                        } catch (Throwable th2) {
                            ZKServiceDecorator.listenFailure(MessageCallbackCaller.this.zkClient.delete(str, i));
                            throw th2;
                        }
                    }
                });
                return;
            }
            if (ZKServiceDecorator.LOG.isDebugEnabled()) {
                ZKServiceDecorator.LOG.debug("Ignoring incoming message from " + str + ": " + message);
            }
            ZKServiceDecorator.listenFailure(this.zkClient.delete(str, i));
        }
    }

    public ZKServiceDecorator(ZKClient zKClient, RunId runId, Supplier<? extends JsonElement> supplier, Service service) {
        this(zKClient, runId, supplier, service, null);
    }

    public ZKServiceDecorator(ZKClient zKClient, RunId runId, Supplier<? extends JsonElement> supplier, Service service, @Nullable Runnable runnable) {
        this.zkClient = zKClient;
        this.id = runId;
        this.liveNodeData = supplier;
        this.decoratedService = service;
        if (service instanceof MessageCallback) {
            this.messageCallback = new MessageCallbackCaller((MessageCallback) service, zKClient);
        } else {
            this.messageCallback = new MessageCallbackCaller(zKClient);
        }
        if (runnable != null) {
            addFinalizer(runnable);
        }
    }

    protected void doStart() {
        this.callbackExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("message-callback"));
        Futures.addCallback(createLiveNode(), new FutureCallback<String>() { // from class: org.apache.twill.internal.ZKServiceDecorator.1
            public void onSuccess(String str) {
                final ListenableFuture allAsList = Futures.allAsList(new ListenableFuture[]{ZKOperations.ignoreError(ZKServiceDecorator.this.zkClient.create(ZKServiceDecorator.this.getZKPath("messages"), (byte[]) null, CreateMode.PERSISTENT), KeeperException.NodeExistsException.class, (Object) null), ZKServiceDecorator.this.zkClient.create(ZKServiceDecorator.this.getZKPath("state"), ZKServiceDecorator.this.encodeStateNode(new StateNode(Service.State.STARTING)), CreateMode.PERSISTENT)});
                allAsList.addListener(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            allAsList.get();
                            ZKServiceDecorator.this.decoratedService.addListener(ZKServiceDecorator.this.createListener(), Threads.SAME_THREAD_EXECUTOR);
                            ZKServiceDecorator.this.decoratedService.start();
                        } catch (Exception e) {
                            ZKServiceDecorator.this.notifyFailed(e);
                        }
                    }
                }, Threads.SAME_THREAD_EXECUTOR);
            }

            public void onFailure(Throwable th) {
                ZKServiceDecorator.this.notifyFailed(th);
            }
        });
    }

    protected void doStop() {
        this.decoratedService.stop();
        this.callbackExecutor.shutdownNow();
    }

    private void addFinalizer(final Runnable runnable) {
        addListener(new ServiceListenerAdapter() { // from class: org.apache.twill.internal.ZKServiceDecorator.2
            public void terminated(Service.State state) {
                try {
                    runnable.run();
                } catch (Throwable th) {
                    ZKServiceDecorator.LOG.warn("Exception when running finalizer.", th);
                }
            }

            public void failed(Service.State state, Throwable th) {
                try {
                    runnable.run();
                } catch (Throwable th2) {
                    ZKServiceDecorator.LOG.warn("Exception when running finalizer.", th2);
                }
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }

    private OperationFuture<String> createLiveNode() {
        String liveNodePath = getLiveNodePath();
        LOG.info("Create live node {}{}", this.zkClient.getConnectString(), liveNodePath);
        JsonObject jsonObject = new JsonObject();
        jsonObject.add("data", (JsonElement) this.liveNodeData.get());
        return ZKOperations.ignoreError(this.zkClient.create(liveNodePath, encodeJson(jsonObject), CreateMode.EPHEMERAL), KeeperException.NodeExistsException.class, liveNodePath);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OperationFuture<String> removeLiveNode() {
        String liveNodePath = getLiveNodePath();
        LOG.info("Remove live node {}{}", this.zkClient.getConnectString(), liveNodePath);
        return ZKOperations.ignoreError(this.zkClient.delete(liveNodePath), KeeperException.NoNodeException.class, liveNodePath);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OperationFuture<String> removeServiceNode() {
        String format = String.format("/%s", this.id.getId());
        LOG.info("Remove service node {}{}", this.zkClient.getConnectString(), format);
        return ZKOperations.recursiveDelete(this.zkClient, format);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void watchMessages() {
        final String zKPath = getZKPath("messages");
        Futures.addCallback(this.zkClient.getChildren(zKPath, new Watcher() { // from class: org.apache.twill.internal.ZKServiceDecorator.3
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged && ZKServiceDecorator.this.decoratedService.isRunning()) {
                    ZKServiceDecorator.this.watchMessages();
                }
            }
        }), new FutureCallback<NodeChildren>() { // from class: org.apache.twill.internal.ZKServiceDecorator.4
            public void onSuccess(NodeChildren nodeChildren) {
                ArrayList<String> newArrayList = Lists.newArrayList(nodeChildren.getChildren());
                Collections.sort(newArrayList);
                for (String str : newArrayList) {
                    ZKServiceDecorator.this.processMessage(zKPath + "/" + str, str);
                }
            }

            public void onFailure(Throwable th) {
                ZKServiceDecorator.LOG.error("Failed to watch messages.", th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processMessage(final String str, final String str2) {
        Futures.addCallback(this.zkClient.getData(str), new FutureCallback<NodeData>() { // from class: org.apache.twill.internal.ZKServiceDecorator.5
            public void onSuccess(NodeData nodeData) {
                Message decode = MessageCodec.decode(nodeData.getData());
                if (decode == null) {
                    ZKServiceDecorator.LOG.error("Failed to decode message for " + str2 + " in " + str);
                    ZKServiceDecorator.listenFailure(ZKServiceDecorator.this.zkClient.delete(str, nodeData.getStat().getVersion()));
                    return;
                }
                if (ZKServiceDecorator.LOG.isDebugEnabled()) {
                    ZKServiceDecorator.LOG.debug("Message received from " + str + ": " + new String(MessageCodec.encode(decode), Charsets.UTF_8));
                }
                if (ZKServiceDecorator.this.handleStopMessage(decode, ZKServiceDecorator.this.getDeleteSupplier(str, nodeData.getStat().getVersion()))) {
                    return;
                }
                ZKServiceDecorator.this.messageCallback.onReceived(ZKServiceDecorator.this.callbackExecutor, str, nodeData.getStat().getVersion(), str2, decode);
            }

            public void onFailure(Throwable th) {
                ZKServiceDecorator.LOG.error("Failed to fetch message content.", th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <V> boolean handleStopMessage(Message message, final Supplier<OperationFuture<V>> supplier) {
        if (message.getType() != Message.Type.SYSTEM || !SystemMessages.STOP_COMMAND.equals(message.getCommand())) {
            return false;
        }
        this.callbackExecutor.execute(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.6
            @Override // java.lang.Runnable
            public void run() {
                ZKServiceDecorator.this.decoratedService.stop().addListener(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.6.1
                    @Override // java.lang.Runnable
                    public void run() {
                        ZKServiceDecorator.this.stopServiceOnComplete((ListenableFuture) supplier.get(), ZKServiceDecorator.this);
                    }
                }, MoreExecutors.sameThreadExecutor());
            }
        });
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Supplier<OperationFuture<String>> getDeleteSupplier(final String str, final int i) {
        return new Supplier<OperationFuture<String>>() { // from class: org.apache.twill.internal.ZKServiceDecorator.7
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public OperationFuture<String> m9get() {
                return ZKServiceDecorator.this.zkClient.delete(str, i);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Service.Listener createListener() {
        return new DecoratedServiceListener();
    }

    private <V> byte[] encode(V v, Class<? extends V> cls) {
        return new GsonBuilder().registerTypeAdapter(StateNode.class, new StateNodeCodec()).registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec()).create().toJson(v, cls).getBytes(Charsets.UTF_8);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public byte[] encodeStateNode(StateNode stateNode) {
        return encode(stateNode, StateNode.class);
    }

    private <V extends JsonElement> byte[] encodeJson(V v) {
        return new Gson().toJson(v).getBytes(Charsets.UTF_8);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getZKPath(String str) {
        return String.format("/%s/%s", this.id, str);
    }

    private String getLiveNodePath() {
        return "/instances/" + this.id;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <V> OperationFuture<V> listenFailure(final OperationFuture<V> operationFuture) {
        operationFuture.addListener(new Runnable() { // from class: org.apache.twill.internal.ZKServiceDecorator.8
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (!operationFuture.isCancelled()) {
                        operationFuture.get();
                    }
                } catch (Exception e) {
                    ZKServiceDecorator.LOG.error("Operation execution failed for " + operationFuture.getRequestPath(), e);
                }
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        return operationFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <V> ListenableFuture<Service.State> stopServiceOnComplete(ListenableFuture<V> listenableFuture, final Service service) {
        return Futures.transform(listenableFuture, new AsyncFunction<V, Service.State>() { // from class: org.apache.twill.internal.ZKServiceDecorator.9
            public ListenableFuture<Service.State> apply(V v) throws Exception {
                return service.stop();
            }
        });
    }
}
