package co.cask.cdap.internal.provision;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.macro.InvalidMacroException;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.api.plugin.Requirements;
import co.cask.cdap.api.security.store.SecureStore;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramStateWriter;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.async.KeyedExecutor;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.logging.LogSamplers;
import co.cask.cdap.common.logging.Loggers;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategy;
import co.cask.cdap.common.transaction.MultiThreadTransactionAware;
import co.cask.cdap.data.dataset.SystemDatasetInstantiator;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.dataset2.MultiThreadDatasetCache;
import co.cask.cdap.data2.transaction.TransactionSystemClientAdapter;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.SystemArguments;
import co.cask.cdap.internal.app.runtime.plugin.MacroParser;
import co.cask.cdap.internal.app.spark.SparkCompatReader;
import co.cask.cdap.internal.pipeline.PluginRequirement;
import co.cask.cdap.internal.provision.ProvisioningOp;
import co.cask.cdap.internal.provision.task.DeprovisionTask;
import co.cask.cdap.internal.provision.task.ProvisionTask;
import co.cask.cdap.logging.context.LoggingContextHelper;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import co.cask.cdap.proto.provisioner.ProvisionerDetail;
import co.cask.cdap.runtime.spi.SparkCompat;
import co.cask.cdap.runtime.spi.provisioner.Capabilities;
import co.cask.cdap.runtime.spi.provisioner.Cluster;
import co.cask.cdap.runtime.spi.provisioner.ClusterStatus;
import co.cask.cdap.runtime.spi.provisioner.Provisioner;
import co.cask.cdap.runtime.spi.provisioner.ProvisionerContext;
import co.cask.cdap.runtime.spi.provisioner.ProvisionerSpecification;
import co.cask.cdap.runtime.spi.provisioner.RetryableProvisionException;
import co.cask.cdap.runtime.spi.ssh.SSHContext;
import co.cask.cdap.runtime.spi.ssh.SSHKeyPair;
import co.cask.cdap.security.spi.authentication.SecurityRequestContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.inject.Inject;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.tephra.RetryStrategies;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/provision/ProvisioningService.class */
public class ProvisioningService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(ProvisioningService.class);
    private static final Logger SAMPLING_LOG = Loggers.sampling(LOG, LogSamplers.onceEvery(20));
    private static final Gson GSON = new Gson();
    private static final Type PLUGIN_REQUIREMENT_SET_TYPE = new TypeToken<Set<PluginRequirement>>() { // from class: co.cask.cdap.internal.provision.ProvisioningService.1
    }.getType();
    private final CConfiguration cConf;
    private final AtomicReference<ProvisionerInfo> provisionerInfo = new AtomicReference<>(new ProvisionerInfo(new HashMap(), new HashMap()));
    private final ProvisionerProvider provisionerProvider;
    private final ProvisionerConfigProvider provisionerConfigProvider;
    private final ProvisionerNotifier provisionerNotifier;
    private final LocationFactory locationFactory;
    private final DatasetFramework datasetFramework;
    private final Transactional transactional;
    private final SparkCompat sparkCompat;
    private final SecureStore secureStore;
    private final Consumer<ProgramRunId> taskStateCleanup;
    private final ProgramStateWriter programStateWriter;
    private KeyedExecutor<ProvisioningTaskKey> taskExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/provision/ProvisioningService$ProvisionerInfo.class */
    public static class ProvisionerInfo {
        private final Map<String, Provisioner> provisioners;
        private final Map<String, ProvisionerDetail> details;

        private ProvisionerInfo(Map<String, Provisioner> map, Map<String, ProvisionerDetail> map2) {
            this.provisioners = Collections.unmodifiableMap(map);
            this.details = Collections.unmodifiableMap(map2);
        }
    }

    @Inject
    ProvisioningService(CConfiguration cConfiguration, ProvisionerProvider provisionerProvider, ProvisionerConfigProvider provisionerConfigProvider, ProvisionerNotifier provisionerNotifier, LocationFactory locationFactory, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, SecureStore secureStore, ProgramStateWriter programStateWriter) {
        this.cConf = cConfiguration;
        this.provisionerProvider = provisionerProvider;
        this.provisionerConfigProvider = provisionerConfigProvider;
        this.provisionerNotifier = provisionerNotifier;
        this.locationFactory = locationFactory;
        this.datasetFramework = datasetFramework;
        this.transactional = Transactions.createTransactionalWithRetry(Transactions.createTransactional(new MultiThreadDatasetCache(new SystemDatasetInstantiator(datasetFramework), new TransactionSystemClientAdapter(transactionSystemClient), NamespaceId.SYSTEM, Collections.emptyMap(), (MetricsContext) null, (Map) null, new MultiThreadTransactionAware[0])), RetryStrategies.retryOnConflict(20, 100L));
        this.sparkCompat = SparkCompatReader.get(cConfiguration);
        this.secureStore = secureStore;
        this.programStateWriter = programStateWriter;
        this.taskStateCleanup = programRunId -> {
            Transactionals.execute(this.transactional, datasetContext -> {
                ProvisionerDataset.get(datasetContext, datasetFramework).deleteTaskInfo(programRunId);
            });
        };
    }

    protected void startUp() throws Exception {
        LOG.info("Starting {}", getClass().getSimpleName());
        initializeProvisioners();
        this.taskExecutor = new KeyedExecutor<>(Executors.newCachedThreadPool(Threads.createDaemonThreadFactory("provisioning-service-%d")));
        resumeTasks(this.taskStateCleanup);
    }

    protected void shutDown() throws Exception {
        LOG.info("Stopping {}", getClass().getSimpleName());
        try {
            this.taskExecutor.shutdownNow();
            this.taskExecutor.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        LOG.info("Stopped {}", getClass().getSimpleName());
    }

    public ClusterStatus getClusterStatus(ProgramRunId programRunId, ProgramOptions programOptions, Cluster cluster, String str) throws Exception {
        Map<String, String> asMap = programOptions.getArguments().asMap();
        Provisioner provisioner = (Provisioner) this.provisionerInfo.get().provisioners.get(SystemArguments.getProfileProvisioner(asMap));
        if (provisioner == null) {
            return ClusterStatus.NOT_EXISTS;
        }
        try {
            ProvisionerContext createContext = createContext(programRunId, str, SystemArguments.getProfileProperties(asMap), new DefaultSSHContext(null, null));
            Retries.Callable callable = () -> {
                return provisioner.getClusterStatus(createContext, cluster);
            };
            RetryStrategy exponentialDelay = co.cask.cdap.common.service.RetryStrategies.exponentialDelay(1L, 5L, TimeUnit.SECONDS);
            Class<RetryableProvisionException> cls = RetryableProvisionException.class;
            RetryableProvisionException.class.getClass();
            return (ClusterStatus) Retries.callWithRetries(callable, exponentialDelay, (v1) -> {
                return r2.isInstance(v1);
            });
        } catch (InvalidMacroException e) {
            runWithProgramLogging(programRunId, asMap, () -> {
                LOG.error("Could not evaluate macros while checking cluster status.", e);
            });
            return ClusterStatus.NOT_EXISTS;
        }
    }

    @VisibleForTesting
    void resumeTasks(Consumer<ProgramRunId> consumer) {
        Runnable createDeprovisionTask;
        for (ProvisioningTaskInfo provisioningTaskInfo : getInProgressTasks()) {
            Service.State state = state();
            if (state != Service.State.STARTING && state != Service.State.RUNNING) {
                return;
            }
            if (!this.taskExecutor.getFuture(provisioningTaskInfo.getTaskKey()).isPresent()) {
                ProgramRunId programRunId = provisioningTaskInfo.getProgramRunId();
                ProvisioningOp provisioningOp = provisioningTaskInfo.getProvisioningOp();
                String provisionerName = provisioningTaskInfo.getProvisionerName();
                Provisioner provisioner = (Provisioner) this.provisionerInfo.get().provisioners.get(provisionerName);
                if (provisioner == null) {
                    LOG.error("Could not provision cluster for program run {} because provisioner {} no longer exists.", programRunId, provisionerName);
                    this.provisionerNotifier.orphaned(programRunId);
                    Transactionals.execute(this.transactional, datasetContext -> {
                        ProvisionerDataset.get(datasetContext, this.datasetFramework).deleteTaskInfo(provisioningTaskInfo.getProgramRunId());
                    });
                } else {
                    switch (provisioningOp.getType()) {
                        case PROVISION:
                            createDeprovisionTask = createProvisionTask(provisioningTaskInfo, provisioner);
                            break;
                        case DEPROVISION:
                            createDeprovisionTask = createDeprovisionTask(provisioningTaskInfo, provisioner, consumer);
                            break;
                        default:
                            LOG.error("Skipping unknown provisioning task type {}", provisioningOp.getType());
                            continue;
                    }
                    if (provisioningOp.getStatus() != ProvisioningOp.Status.CANCELLED && provisioningOp.getStatus() != ProvisioningOp.Status.CREATED) {
                        LOG.info("Resuming provisioning task for run {} of type {} in state {}.", new Object[]{provisioningTaskInfo.getProgramRunId(), provisioningTaskInfo.getProvisioningOp().getType(), provisioningTaskInfo.getProvisioningOp().getStatus()});
                    }
                    createDeprovisionTask.run();
                }
            }
        }
    }

    public Optional<ProvisioningTaskInfo> cancelProvisionTask(ProgramRunId programRunId) {
        ProvisioningTaskKey provisioningTaskKey = new ProvisioningTaskKey(programRunId, ProvisioningOp.Type.PROVISION);
        ProgramStateWriter programStateWriter = this.programStateWriter;
        programStateWriter.getClass();
        return cancelTask(provisioningTaskKey, programStateWriter::killed);
    }

    Optional<ProvisioningTaskInfo> cancelDeprovisionTask(ProgramRunId programRunId) {
        ProvisioningTaskKey provisioningTaskKey = new ProvisioningTaskKey(programRunId, ProvisioningOp.Type.DEPROVISION);
        ProvisionerNotifier provisionerNotifier = this.provisionerNotifier;
        provisionerNotifier.getClass();
        return cancelTask(provisioningTaskKey, provisionerNotifier::orphaned);
    }

    public Runnable provision(ProvisionRequest provisionRequest, DatasetContext datasetContext) {
        ProgramRunId programRunId = provisionRequest.getProgramRunId();
        ProgramOptions programOptions = provisionRequest.getProgramOptions();
        Map<String, String> asMap = programOptions.getArguments().asMap();
        String profileProvisioner = SystemArguments.getProfileProvisioner(asMap);
        Provisioner provisioner = (Provisioner) this.provisionerInfo.get().provisioners.get(profileProvisioner);
        if (provisioner == null) {
            runWithProgramLogging(programRunId, asMap, () -> {
                LOG.error("Could not provision cluster for the run because provisioner {} does not exist.", profileProvisioner);
            });
            this.programStateWriter.error(programRunId, new IllegalStateException("Provisioner does not exist."));
            this.provisionerNotifier.deprovisioned(programRunId);
            return () -> {
            };
        }
        Set<PluginRequirement> set = (Set) GSON.fromJson(asMap.get(ProgramOptionConstants.PLUGIN_REQUIREMENTS), PLUGIN_REQUIREMENT_SET_TYPE);
        if (set != null) {
            Set<PluginRequirement> unfulfilledRequirements = getUnfulfilledRequirements(provisioner.getCapabilities(), set);
            if (!unfulfilledRequirements.isEmpty()) {
                runWithProgramLogging(programRunId, asMap, () -> {
                    LOG.error(String.format("'%s' cannot be run using profile '%s' because the profile does not met all plugin requirements. Following requirements were not meet by the listed plugins: '%s'", programRunId.getProgram(), profileProvisioner, groupByRequirement(unfulfilledRequirements)));
                });
                this.programStateWriter.error(programRunId, new IllegalArgumentException("Provisioner does not meet all the requirements for the program to run."));
                this.provisionerNotifier.deprovisioned(programRunId);
                return () -> {
                };
            }
        }
        ProvisioningTaskInfo provisioningTaskInfo = new ProvisioningTaskInfo(programRunId, provisionRequest.getProgramDescriptor(), programOptions, SystemArguments.getProfileProperties(asMap), profileProvisioner, provisionRequest.getUser(), new ProvisioningOp(ProvisioningOp.Type.PROVISION, ProvisioningOp.Status.REQUESTING_CREATE), createKeysDirectory(programRunId).toURI(), null);
        ProvisionerDataset.get(datasetContext, this.datasetFramework).putTaskInfo(provisioningTaskInfo);
        return createProvisionTask(provisioningTaskInfo, provisioner);
    }

    public Runnable deprovision(ProgramRunId programRunId, DatasetContext datasetContext) {
        return deprovision(programRunId, datasetContext, this.taskStateCleanup);
    }

    @VisibleForTesting
    Runnable deprovision(ProgramRunId programRunId, DatasetContext datasetContext, Consumer<ProgramRunId> consumer) {
        ProvisionerDataset provisionerDataset = ProvisionerDataset.get(datasetContext, this.datasetFramework);
        ProvisioningTaskInfo taskInfo = provisionerDataset.getTaskInfo(new ProvisioningTaskKey(programRunId, ProvisioningOp.Type.PROVISION));
        if (taskInfo == null) {
            runWithProgramLogging(programRunId, Collections.emptyMap(), () -> {
                LOG.error("No task state found while deprovisioning the cluster. The cluster will be marked as orphaned.");
            });
            this.programStateWriter.error(programRunId, new IllegalStateException("No task state found while deprovisioning the cluster."));
            this.provisionerNotifier.orphaned(programRunId);
            return () -> {
            };
        }
        if (taskInfo.getCluster() == null) {
            this.provisionerNotifier.deprovisioned(programRunId);
            return () -> {
                consumer.accept(programRunId);
            };
        }
        Provisioner provisioner = (Provisioner) this.provisionerInfo.get().provisioners.get(taskInfo.getProvisionerName());
        if (provisioner != null) {
            ProvisioningTaskInfo provisioningTaskInfo = new ProvisioningTaskInfo(taskInfo, new ProvisioningOp(ProvisioningOp.Type.DEPROVISION, ProvisioningOp.Status.REQUESTING_DELETE), taskInfo.getCluster());
            provisionerDataset.putTaskInfo(provisioningTaskInfo);
            return createDeprovisionTask(provisioningTaskInfo, provisioner, consumer);
        }
        runWithProgramLogging(programRunId, taskInfo.getProgramOptions().getArguments().asMap(), () -> {
            LOG.error("Could not deprovision the cluster because provisioner {} does not exist. The cluster will be marked as orphaned.", taskInfo.getProvisionerName());
        });
        this.programStateWriter.error(programRunId, new IllegalStateException("Provisioner not found while deprovisioning the cluster."));
        this.provisionerNotifier.orphaned(programRunId);
        return () -> {
            consumer.accept(taskInfo.getProgramRunId());
        };
    }

    @VisibleForTesting
    Set<PluginRequirement> getUnfulfilledRequirements(Capabilities capabilities, Set<PluginRequirement> set) {
        HashSet hashSet = new HashSet();
        Set set2 = (Set) capabilities.getDatasetTypes().stream().map((v0) -> {
            return v0.toLowerCase();
        }).collect(Collectors.toSet());
        for (PluginRequirement pluginRequirement : set) {
            Sets.SetView difference = Sets.difference(pluginRequirement.getRequirements().getDatasetTypes(), set2);
            if (!difference.isEmpty()) {
                hashSet.add(new PluginRequirement(pluginRequirement.getName(), pluginRequirement.getType(), new Requirements(difference.immutableCopy())));
            }
        }
        return hashSet;
    }

    @VisibleForTesting
    Map<String, Set<String>> groupByRequirement(Set<PluginRequirement> set) {
        HashMap hashMap = new HashMap();
        for (PluginRequirement pluginRequirement : set) {
            for (String str : pluginRequirement.getRequirements().getDatasetTypes()) {
                Set set2 = (Set) hashMap.getOrDefault(str, new HashSet());
                set2.add(pluginRequirement.getType() + ":" + pluginRequirement.getName());
                hashMap.put(str, set2);
            }
        }
        return hashMap;
    }

    private void initializeProvisioners() {
        Map<String, Provisioner> loadProvisioners = this.provisionerProvider.loadProvisioners();
        Map<String, ProvisionerConfig> loadProvisionerConfigs = this.provisionerConfigProvider.loadProvisionerConfigs(loadProvisioners.keySet());
        LOG.debug("Provisioners = {}", loadProvisioners);
        HashMap hashMap = new HashMap(loadProvisioners.size());
        for (Map.Entry<String, Provisioner> entry : loadProvisioners.entrySet()) {
            String key = entry.getKey();
            Provisioner value = entry.getValue();
            try {
                value.initialize(new DefaultSystemProvisionerContext(this.cConf, key));
                ProvisionerSpecification spec = value.getSpec();
                ProvisionerConfig orDefault = loadProvisionerConfigs.getOrDefault(key, new ProvisionerConfig(new ArrayList(), null, false));
                hashMap.put(key, new ProvisionerDetail(spec.getName(), spec.getLabel(), spec.getDescription(), orDefault.getConfigurationGroups(), orDefault.getIcon(), orDefault.isBeta()));
            } catch (RuntimeException e) {
                LOG.warn("Error initializing the {} provisioner. It will not be available for use.", key, e);
                loadProvisioners.remove(key);
            }
        }
        this.provisionerInfo.set(new ProvisionerInfo(loadProvisioners, hashMap));
    }

    public Collection<ProvisionerDetail> getProvisionerDetails() {
        return this.provisionerInfo.get().details.values();
    }

    @Nullable
    public ProvisionerDetail getProvisionerDetail(String str) {
        return (ProvisionerDetail) this.provisionerInfo.get().details.get(str);
    }

    public void validateProperties(String str, Map<String, String> map) throws NotFoundException {
        Provisioner provisioner = (Provisioner) this.provisionerInfo.get().provisioners.get(str);
        if (provisioner == null) {
            throw new NotFoundException(String.format("Provisioner '%s' does not exist", str));
        }
        provisioner.validateProperties(map);
    }

    private Runnable createProvisionTask(ProvisioningTaskInfo provisioningTaskInfo, Provisioner provisioner) {
        ProgramRunId programRunId = provisioningTaskInfo.getProgramRunId();
        try {
            ProvisionTask provisionTask = new ProvisionTask(provisioningTaskInfo, this.transactional, this.datasetFramework, provisioner, createContext(programRunId, provisioningTaskInfo.getUser(), provisioningTaskInfo.getProvisionerProperties(), new DefaultSSHContext(this.locationFactory.create(provisioningTaskInfo.getSecureKeysDir()), createSSHKeyPair(provisioningTaskInfo))), this.provisionerNotifier, this.programStateWriter, 300);
            Runnable runnable = () -> {
                runWithProgramLogging(programRunId, provisioningTaskInfo.getProgramOptions().getArguments().asMap(), () -> {
                    try {
                        provisionTask.execute();
                    } catch (InterruptedException e) {
                        LOG.debug("Provision task for program run {} interrupted.", provisioningTaskInfo.getProgramRunId());
                    } catch (Exception e2) {
                        LOG.info("Provision task for program run {} failed.", provisioningTaskInfo.getProgramRunId(), e2);
                    }
                });
            };
            ProvisioningTaskKey provisioningTaskKey = new ProvisioningTaskKey(programRunId, ProvisioningOp.Type.PROVISION);
            return () -> {
                this.taskExecutor.submit(provisioningTaskKey, runnable);
            };
        } catch (IOException e) {
            runWithProgramLogging(provisioningTaskInfo.getProgramRunId(), provisioningTaskInfo.getProgramOptions().getArguments().asMap(), () -> {
                LOG.error("Failed to load ssh key. The run will be marked as failed.", e);
            });
            this.provisionerNotifier.deprovisioning(provisioningTaskInfo.getProgramRunId());
            return () -> {
            };
        } catch (InvalidMacroException e2) {
            runWithProgramLogging(provisioningTaskInfo.getProgramRunId(), provisioningTaskInfo.getProgramOptions().getArguments().asMap(), () -> {
                LOG.error("Could not evaluate macros while provisoning. The run will be marked as failed.", e2);
            });
            this.provisionerNotifier.deprovisioning(provisioningTaskInfo.getProgramRunId());
            return () -> {
            };
        }
    }

    private Runnable createDeprovisionTask(ProvisioningTaskInfo provisioningTaskInfo, Provisioner provisioner, Consumer<ProgramRunId> consumer) {
        Map<String, String> provisionerProperties = provisioningTaskInfo.getProvisionerProperties();
        SSHKeyPair sSHKeyPair = null;
        try {
            sSHKeyPair = createSSHKeyPair(provisioningTaskInfo);
        } catch (IOException e) {
            LOG.warn("Failed to load ssh key. No SSH key will be available for the deprovision task", e);
        }
        try {
            DeprovisionTask deprovisionTask = new DeprovisionTask(provisioningTaskInfo, this.transactional, this.datasetFramework, 300, provisioner, createContext(provisioningTaskInfo.getProgramRunId(), provisioningTaskInfo.getUser(), provisionerProperties, new DefaultSSHContext(null, sSHKeyPair)), this.provisionerNotifier, this.locationFactory);
            Runnable runnable = () -> {
                runWithProgramLogging(provisioningTaskInfo.getProgramRunId(), provisioningTaskInfo.getProgramOptions().getArguments().asMap(), () -> {
                    try {
                        deprovisionTask.execute();
                        consumer.accept(provisioningTaskInfo.getProgramRunId());
                    } catch (InterruptedException e2) {
                        LOG.debug("Deprovision task for program run {} interrupted.", provisioningTaskInfo.getProgramRunId());
                    } catch (Exception e3) {
                        LOG.info("Deprovision task for program run {} failed.", provisioningTaskInfo.getProgramRunId(), e3);
                        consumer.accept(provisioningTaskInfo.getProgramRunId());
                    }
                });
            };
            ProvisioningTaskKey provisioningTaskKey = new ProvisioningTaskKey(provisioningTaskInfo.getProgramRunId(), ProvisioningOp.Type.DEPROVISION);
            return () -> {
                this.taskExecutor.submit(provisioningTaskKey, runnable);
            };
        } catch (InvalidMacroException e2) {
            runWithProgramLogging(provisioningTaskInfo.getProgramRunId(), provisioningTaskInfo.getProgramOptions().getArguments().asMap(), () -> {
                LOG.error("Could not evaluate macros while deprovisoning. The cluster will be marked as orphaned.", e2);
            });
            this.provisionerNotifier.orphaned(provisioningTaskInfo.getProgramRunId());
            return () -> {
            };
        }
    }

    private List<ProvisioningTaskInfo> getInProgressTasks() {
        return (List) Retries.callWithRetries(() -> {
            return (List) Transactionals.execute(this.transactional, datasetContext -> {
                return ProvisionerDataset.get(datasetContext, this.datasetFramework).listTaskInfo();
            });
        }, co.cask.cdap.common.service.RetryStrategies.fixDelay(6L, TimeUnit.SECONDS), th -> {
            Service.State state = state();
            if ((state != Service.State.STARTING && state != Service.State.RUNNING) || (th instanceof InterruptedException)) {
                return false;
            }
            Throwable rootCause = Throwables.getRootCause(th);
            if ((rootCause instanceof SocketTimeoutException) || (rootCause instanceof ConnectException)) {
                return true;
            }
            SAMPLING_LOG.warn("Error scanning for in-progress provisioner tasks. Tasks that were in progress during the last CDAP shutdown will not be resumed until this succeeds. ", th);
            return true;
        });
    }

    @Nullable
    private SSHKeyPair createSSHKeyPair(ProvisioningTaskInfo provisioningTaskInfo) throws IOException {
        String str = (String) Optional.ofNullable(provisioningTaskInfo.getCluster()).map((v0) -> {
            return v0.getProperties();
        }).map(map -> {
            return (String) map.get("ssh.user");
        }).orElse(null);
        if (str == null) {
            return null;
        }
        Location create = this.locationFactory.create(provisioningTaskInfo.getSecureKeysDir());
        Location append = create.append("id_rsa.pub");
        Location append2 = create.append("id_rsa");
        if (append.exists() && append2.exists()) {
            return new LocationBasedSSHKeyPair(create, str);
        }
        return null;
    }

    private Location createKeysDirectory(ProgramRunId programRunId) {
        Location create = this.locationFactory.create(String.format("provisioner/keys/%s.%s.%s.%s.%s", programRunId.getNamespace(), programRunId.getApplication(), programRunId.getType().name().toLowerCase(), programRunId.getProgram(), programRunId.getRun()));
        try {
            create.mkdirs();
            return create;
        } catch (IOException e) {
            throw new RuntimeException("Failed to create directory " + create, e);
        }
    }

    private ProvisionerContext createContext(ProgramRunId programRunId, String str, Map<String, String> map, SSHContext sSHContext) {
        return new DefaultProvisionerContext(programRunId, evaluateMacros(this.secureStore, str, programRunId.getNamespace(), map), this.sparkCompat, sSHContext);
    }

    @VisibleForTesting
    static Map<String, String> evaluateMacros(SecureStore secureStore, String str, String str2, Map<String, String> map) {
        MacroParser build = MacroParser.builder(new ProvisionerMacroEvaluator(str2, secureStore)).disableEscaping().disableLookups().whitelistFunctions(ProvisionerMacroEvaluator.SECURE_FUNCTION).build();
        HashMap hashMap = new HashMap();
        String userId = SecurityRequestContext.getUserId();
        try {
            SecurityRequestContext.setUserId(str);
            for (Map.Entry<String, String> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), build.parse(entry.getValue()));
            }
            return hashMap;
        } finally {
            SecurityRequestContext.setUserId(userId);
        }
    }

    private void runWithProgramLogging(ProgramRunId programRunId, Map<String, String> map, Runnable runnable) {
        Cancellable loggingContext = LoggingContextAccessor.setLoggingContext(LoggingContextHelper.getLoggingContextWithRunId(programRunId, map));
        try {
            runnable.run();
            loggingContext.cancel();
        } catch (Throwable th) {
            loggingContext.cancel();
            throw th;
        }
    }

    private Optional<ProvisioningTaskInfo> cancelTask(ProvisioningTaskKey provisioningTaskKey, Consumer<ProgramRunId> consumer) {
        Optional future = this.taskExecutor.getFuture(provisioningTaskKey);
        if (!future.isPresent()) {
            return Optional.empty();
        }
        Future future2 = (Future) future.get();
        if (future2.isDone()) {
            return Optional.empty();
        }
        LOG.trace("Cancelling {} task for program run {}.", provisioningTaskKey.getType(), provisioningTaskKey.getProgramRunId());
        if (!future2.cancel(true)) {
            return Optional.empty();
        }
        LOG.debug("Cancelled {} task for program run {}.", provisioningTaskKey.getType(), provisioningTaskKey.getProgramRunId());
        ProvisioningTaskInfo provisioningTaskInfo = (ProvisioningTaskInfo) Transactionals.execute(this.transactional, datasetContext -> {
            ProvisionerDataset provisionerDataset = ProvisionerDataset.get(datasetContext, this.datasetFramework);
            ProvisioningTaskInfo taskInfo = provisionerDataset.getTaskInfo(provisioningTaskKey);
            if (taskInfo == null) {
                return null;
            }
            provisionerDataset.putTaskInfo(new ProvisioningTaskInfo(taskInfo, new ProvisioningOp(taskInfo.getProvisioningOp().getType(), ProvisioningOp.Status.CANCELLED), taskInfo.getCluster()));
            LOG.trace("Recorded cancelled state for {} task for program run {}.", provisioningTaskKey.getType(), provisioningTaskKey.getProgramRunId());
            return taskInfo;
        });
        if (provisioningTaskInfo == null) {
            return Optional.empty();
        }
        consumer.accept(provisioningTaskInfo.getProgramRunId());
        return Optional.of(provisioningTaskInfo);
    }
}
