package org.apache.twill.internal;

import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import org.apache.twill.api.Command;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.logging.LogThrowable;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
import org.apache.twill.discovery.ZKDiscoveryService;
import org.apache.twill.internal.json.LogEntryDecoder;
import org.apache.twill.internal.json.LogThrowableCodec;
import org.apache.twill.internal.json.StackTraceElementCodec;
import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/internal/AbstractTwillController.class */
public abstract class AbstractTwillController extends AbstractZKServiceController implements TwillController {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTwillController.class);
    private static final Gson GSON = new Gson();
    private final Queue<LogHandler> logHandlers;
    private final KafkaClientService kafkaClient;
    private final DiscoveryServiceClient discoveryServiceClient;
    private volatile Cancellable logCancellable;

    /* loaded from: input_file:org/apache/twill/internal/AbstractTwillController$LogMessageCallback.class */
    private static final class LogMessageCallback implements KafkaConsumer.MessageCallback {
        private static final Gson GSON = new GsonBuilder().registerTypeAdapter(LogEntry.class, new LogEntryDecoder()).registerTypeAdapter(LogThrowable.class, new LogThrowableCodec()).registerTypeAdapter(StackTraceElement.class, new StackTraceElementCodec()).create();
        private final Iterable<LogHandler> logHandlers;

        private LogMessageCallback(Iterable<LogHandler> iterable) {
            this.logHandlers = iterable;
        }

        @Override // org.apache.twill.kafka.client.KafkaConsumer.MessageCallback
        public void onReceived(Iterator<FetchedMessage> it2) {
            while (it2.hasNext()) {
                String charBuffer = Charsets.UTF_8.decode(it2.next().getPayload()).toString();
                try {
                    LogEntry logEntry = (LogEntry) GSON.fromJson(charBuffer, LogEntry.class);
                    if (logEntry != null) {
                        invokeHandlers(logEntry);
                    }
                } catch (Exception e) {
                    AbstractTwillController.LOG.error("Failed to decode log entry {}", charBuffer, e);
                }
            }
        }

        @Override // org.apache.twill.kafka.client.KafkaConsumer.MessageCallback
        public void finished() {
        }

        private void invokeHandlers(LogEntry logEntry) {
            for (LogHandler logHandler : this.logHandlers) {
                try {
                    logHandler.onLog(logEntry);
                } catch (Throwable th) {
                    AbstractTwillController.LOG.warn("Exception while calling LogHandler {}", logHandler, th);
                }
            }
        }
    }

    public AbstractTwillController(RunId runId, ZKClient zKClient, Iterable<LogHandler> iterable) {
        super(runId, zKClient);
        this.logHandlers = new ConcurrentLinkedQueue();
        this.kafkaClient = new ZKKafkaClientService(ZKClients.namespace(zKClient, "/" + runId.getId() + "/kafka"));
        this.discoveryServiceClient = new ZKDiscoveryService(zKClient);
        Iterables.addAll(this.logHandlers, iterable);
    }

    @Override // org.apache.twill.internal.AbstractZKServiceController
    protected synchronized void doStartUp() {
        if (this.logHandlers.isEmpty()) {
            return;
        }
        this.kafkaClient.startAndWait();
        this.logCancellable = this.kafkaClient.getConsumer().prepare().addFromBeginning(Constants.LOG_TOPIC, 0).consume(new LogMessageCallback(this.logHandlers));
    }

    @Override // org.apache.twill.internal.AbstractZKServiceController
    protected void doShutDown() {
        if (this.logCancellable != null) {
            this.logCancellable.cancel();
        }
        this.kafkaClient.stopAndWait();
    }

    @Override // org.apache.twill.api.TwillController
    public final synchronized void addLogHandler(LogHandler logHandler) {
        this.logHandlers.add(logHandler);
        if (this.logHandlers.size() == 1) {
            this.kafkaClient.startAndWait();
            this.logCancellable = this.kafkaClient.getConsumer().prepare().addFromBeginning(Constants.LOG_TOPIC, 0).consume(new LogMessageCallback(this.logHandlers));
        }
    }

    @Override // org.apache.twill.api.TwillController
    public final ServiceDiscovered discoverService(String str) {
        return this.discoveryServiceClient.discover(str);
    }

    @Override // org.apache.twill.api.TwillController
    public final ListenableFuture<Integer> changeInstances(String str, int i) {
        return sendMessage(SystemMessages.setInstances(str, i), Integer.valueOf(i));
    }

    @Override // org.apache.twill.api.TwillController
    public final ListenableFuture<String> restartAllInstances(String str) {
        Command build = Command.Builder.of(Constants.RESTART_ALL_RUNNABLE_INSTANCES).build();
        return sendMessage(SystemMessages.updateRunnableInstances(build, str), build.getCommand());
    }

    @Override // org.apache.twill.api.TwillController
    public final ListenableFuture<Set<String>> restartInstances(Map<String, ? extends Set<Integer>> map) {
        return sendMessage(SystemMessages.updateRunnablesInstances(Command.Builder.of(Constants.RESTART_RUNNABLES_INSTANCES).addOptions(Maps.transformEntries(map, new Maps.EntryTransformer<String, Set<Integer>, String>() { // from class: org.apache.twill.internal.AbstractTwillController.1
            @Override // com.google.common.collect.Maps.EntryTransformer
            public String transformEntry(String str, Set<Integer> set) {
                AbstractTwillController.this.validateInstanceIds(str, set);
                return AbstractTwillController.GSON.toJson(set, new TypeToken<Set<Integer>>() { // from class: org.apache.twill.internal.AbstractTwillController.1.1
                }.getType());
            }
        })).build()), map.keySet());
    }

    @Override // org.apache.twill.api.TwillController
    public ListenableFuture<String> restartInstances(final String str, int i, int... iArr) {
        LinkedHashSet newLinkedHashSet = Sets.newLinkedHashSet();
        newLinkedHashSet.add(Integer.valueOf(i));
        for (int i2 : iArr) {
            newLinkedHashSet.add(Integer.valueOf(i2));
        }
        return Futures.transform(restartInstances((Map<String, ? extends Set<Integer>>) ImmutableMap.of(str, newLinkedHashSet)), new Function<Set<String>, String>() { // from class: org.apache.twill.internal.AbstractTwillController.2
            @Override // com.google.common.base.Function
            public String apply(Set<String> set) {
                return str;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateInstanceIds(String str, Set<Integer> set) {
        ResourceReport resourceReport = getResourceReport();
        if (resourceReport == null) {
            throw new IllegalStateException("Unable to get resource report since application has not started.");
        }
        Collection<TwillRunResources> runnableResources = resourceReport.getRunnableResources(str);
        if (runnableResources == null) {
            throw new RuntimeException("Unable to verify run resources for runnable " + str);
        }
        HashSet newHashSet = Sets.newHashSet();
        Iterator<TwillRunResources> it2 = runnableResources.iterator();
        while (it2.hasNext()) {
            newHashSet.add(Integer.valueOf(it2.next().getInstanceId()));
        }
        LOG.info("Existing instance ids: {}", newHashSet);
        Iterator<Integer> it3 = set.iterator();
        while (it3.hasNext()) {
            int intValue = it3.next().intValue();
            if (!newHashSet.contains(Integer.valueOf(intValue))) {
                throw new IllegalArgumentException("Unable to find instance id " + intValue + " for " + str);
            }
        }
    }

    @Override // org.apache.twill.api.TwillController
    public /* bridge */ /* synthetic */ Future restartInstances(Map map) {
        return restartInstances((Map<String, ? extends Set<Integer>>) map);
    }
}
