package org.apache.samza.rest.proxy.task;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.container.grouper.task.TaskAssignmentManager;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.installation.InstallationFinder;
import org.apache.samza.rest.proxy.job.JobInstance;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/samza/rest/proxy/task/SamzaTaskProxy.class */
public class SamzaTaskProxy implements TaskProxy {
    private static final Logger LOG = LoggerFactory.getLogger(SamzaTaskProxy.class);
    private static final MetricsRegistryMap METRICS_REGISTRY = new MetricsRegistryMap();
    private final TaskResourceConfig taskResourceConfig;
    private final InstallationFinder installFinder;

    public SamzaTaskProxy(TaskResourceConfig taskResourceConfig, InstallationFinder installationFinder) {
        this.taskResourceConfig = taskResourceConfig;
        this.installFinder = installationFinder;
    }

    @Override // org.apache.samza.rest.proxy.task.TaskProxy
    public List<Task> getTasks(JobInstance jobInstance) throws IOException, InterruptedException {
        Preconditions.checkArgument(this.installFinder.isInstalled(jobInstance), String.format("Invalid job instance : %s", jobInstance));
        CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer = null;
        try {
            coordinatorStreamSystemConsumer = initializeCoordinatorStreamConsumer(jobInstance);
            List<Task> readTasksFromCoordinatorStream = readTasksFromCoordinatorStream(coordinatorStreamSystemConsumer);
            if (coordinatorStreamSystemConsumer != null) {
                coordinatorStreamSystemConsumer.stop();
            }
            return readTasksFromCoordinatorStream;
        } catch (Throwable th) {
            if (coordinatorStreamSystemConsumer != null) {
                coordinatorStreamSystemConsumer.stop();
            }
            throw th;
        }
    }

    protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) {
        Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
        LOG.debug("Using config: {} to create coordinatorStream consumer.", coordinatorSystemConfig);
        CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
        LOG.debug("Registering coordinator system stream consumer.");
        coordinatorStreamSystemConsumer.register();
        LOG.debug("Starting coordinator system stream consumer.");
        coordinatorStreamSystemConsumer.start();
        LOG.debug("Bootstrapping coordinator system stream consumer.");
        coordinatorStreamSystemConsumer.bootstrap();
        return coordinatorStreamSystemConsumer;
    }

    private Config getCoordinatorSystemConfig(JobInstance jobInstance) {
        try {
            return CoordinatorStreamUtil.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(((ConfigFactory) Util.getObj(this.taskResourceConfig.getJobConfigFactory(), ConfigFactory.class)).getConfig(new URI(String.format("file://%s", this.installFinder.getAllInstalledJobs().get(jobInstance).getConfigFilePath()))), ImmutableMap.of(JobConfig.JOB_ID(), jobInstance.getJobId(), JobConfig.JOB_NAME(), jobInstance.getJobName()))));
        } catch (Exception e) {
            LOG.error(String.format("Failed to get coordinator stream config for job : %s", jobInstance), e);
            throw new SamzaException(e);
        }
    }

    protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer) {
        Map readContainerLocality = new LocalityManager(coordinatorStreamSystemConsumer.getConfig(), new MetricsRegistryMap()).readContainerLocality();
        Map readTaskAssignment = new TaskAssignmentManager(coordinatorStreamSystemConsumer.getConfig(), new MetricsRegistryMap()).readTaskAssignment();
        List list = (List) JavaConverters.seqAsJavaListConverter(new StorageConfig(coordinatorStreamSystemConsumer.getConfig()).getStoreNames()).asJava();
        return (List) readTaskAssignment.entrySet().stream().map(entry -> {
            return new Task((String) ((Map) readContainerLocality.get(entry.getValue())).get("host"), (String) entry.getKey(), (String) entry.getValue(), new ArrayList(), list);
        }).collect(Collectors.toList());
    }
}
