package org.apache.gobblin.service.modules.orchestration;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.gson.reflect.TypeToken;
import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/service/modules/orchestration/FSDagStateStore.class */
public class FSDagStateStore implements DagStateStore {
    private static final Logger log = LoggerFactory.getLogger(FSDagStateStore.class);
    public static final String DAG_FILE_EXTENSION = ".dag";
    static final String DAG_STATESTORE_DIR = "gobblin.service.dagManager.dagStateStoreDir";
    private final String dagCheckpointDir;
    private final GsonSerDe<List<JobExecutionPlan>> serDe;

    /* JADX WARN: Type inference failed for: r0v7, types: [org.apache.gobblin.service.modules.orchestration.FSDagStateStore$1] */
    public FSDagStateStore(Config config, Map<URI, TopologySpec> map) throws IOException {
        this.dagCheckpointDir = config.getString(DAG_STATESTORE_DIR);
        File file = new File(this.dagCheckpointDir);
        if (!file.exists() && !file.mkdirs()) {
            throw new IOException("Could not create dag state store dir - " + this.dagCheckpointDir);
        }
        this.serDe = new GsonSerDe<>(new TypeToken<List<JobExecutionPlan>>() { // from class: org.apache.gobblin.service.modules.orchestration.FSDagStateStore.1
        }.getType(), new JobExecutionPlanListSerializer(), new JobExecutionPlanListDeserializer(map));
    }

    @Override // org.apache.gobblin.service.modules.orchestration.DagStateStore
    public synchronized void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
        String str = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION;
        String serializeDag = serializeDag(dag);
        File file = new File(this.dagCheckpointDir, str + ".tmp");
        File file2 = new File(this.dagCheckpointDir, str);
        Files.write(serializeDag, file, Charsets.UTF_8);
        Files.move(file, file2);
    }

    @Override // org.apache.gobblin.service.modules.orchestration.DagStateStore
    public synchronized void cleanUp(Dag<JobExecutionPlan> dag) {
        cleanUp(DagManagerUtils.generateDagId(dag));
    }

    @Override // org.apache.gobblin.service.modules.orchestration.DagStateStore
    public synchronized void cleanUp(String str) {
        File file = new File(this.dagCheckpointDir, str + DAG_FILE_EXTENSION);
        if (file.delete()) {
            return;
        }
        log.error("Could not delete checkpoint file: {}", file.getName());
    }

    @Override // org.apache.gobblin.service.modules.orchestration.DagStateStore
    public List<Dag<JobExecutionPlan>> getDags() throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (File file : new File(this.dagCheckpointDir).listFiles((file2, str) -> {
            return str.endsWith(DAG_FILE_EXTENSION);
        })) {
            newArrayList.add(getDag(file));
        }
        return newArrayList;
    }

    @Override // org.apache.gobblin.service.modules.orchestration.DagStateStore
    public Dag<JobExecutionPlan> getDag(String str) throws IOException {
        File file = new File(this.dagCheckpointDir, str + DAG_FILE_EXTENSION);
        if (file.exists()) {
            return getDag(file);
        }
        return null;
    }

    @VisibleForTesting
    public Dag<JobExecutionPlan> getDag(File file) throws IOException {
        return deserializeDag(Files.toString(file, Charsets.UTF_8));
    }

    @Override // org.apache.gobblin.service.modules.orchestration.DagStateStore
    public Set<String> getDagIds() {
        HashSet hashSet = new HashSet();
        for (File file : new File(this.dagCheckpointDir).listFiles((file2, str) -> {
            return str.endsWith(DAG_FILE_EXTENSION);
        })) {
            hashSet.add(StringUtils.removeEnd(file.getName(), DAG_FILE_EXTENSION));
        }
        return hashSet;
    }

    private String serializeDag(Dag<JobExecutionPlan> dag) {
        return this.serDe.serialize((List) dag.getNodes().stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList()));
    }

    private Dag<JobExecutionPlan> deserializeDag(String str) {
        return new JobExecutionPlanDagFactory().createDag((List) this.serDe.deserialize(str));
    }
}
