package com.twitter.scalding.commons.tap;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.tap.hadoop.Hfs;
import com.twitter.scalding.commons.datastores.VersionedStore;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;

/* loaded from: input_file:com/twitter/scalding/commons/tap/VersionedTap.class */
public class VersionedTap extends Hfs {
    public Long version;
    private int versionsToKeep;
    public TapMode mode;
    private String newVersionPath;

    /* loaded from: input_file:com/twitter/scalding/commons/tap/VersionedTap$TapMode.class */
    public enum TapMode {
        SOURCE,
        SINK
    }

    public VersionedTap(String str, Scheme<JobConf, RecordReader, OutputCollector, ?, ?> scheme, TapMode tapMode) throws IOException {
        super(scheme, str);
        this.version = null;
        this.versionsToKeep = 3;
        this.mode = tapMode;
    }

    public VersionedTap setVersion(long j) {
        this.version = Long.valueOf(j);
        return this;
    }

    public VersionedTap setVersionsToKeep(int i) {
        this.versionsToKeep = i;
        return this;
    }

    public int getVersionsToKeep() {
        return this.versionsToKeep;
    }

    public String getOutputDirectory() {
        return getPath().toString();
    }

    public VersionedStore getStore(JobConf jobConf) throws IOException {
        return new VersionedStore(getPath().getFileSystem(jobConf), getOutputDirectory());
    }

    public String getSourcePath(JobConf jobConf) {
        try {
            VersionedStore store = getStore(jobConf);
            String versionPath = this.version != null ? store.versionPath(this.version.longValue()) : store.mostRecentVersionPath();
            if (versionPath == null) {
                throw new RuntimeException("Could not find valid source path for VersionTap with root: " + store.getRoot());
            }
            return versionPath;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public String getSinkPath(JobConf jobConf) {
        try {
            VersionedStore store = getStore(jobConf);
            String createVersion = this.version == null ? store.createVersion() : store.createVersion(this.version.longValue());
            if (createVersion == null) {
                throw new RuntimeException("Could not find valid sink path for VersionTap with root: " + store.getRoot());
            }
            return createVersion;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void sourceConfInit(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        super.sourceConfInit(flowProcess, jobConf);
        FileInputFormat.setInputPaths(jobConf, getSourcePath(jobConf));
    }

    public void sinkConfInit(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        super.sinkConfInit(flowProcess, jobConf);
        if (this.newVersionPath == null) {
            this.newVersionPath = getSinkPath(jobConf);
        }
        FileOutputFormat.setOutputPath(jobConf, new Path(this.newVersionPath));
    }

    public boolean resourceExists(JobConf jobConf) throws IOException {
        return getStore(jobConf).mostRecentVersion() != null;
    }

    public boolean createResource(JobConf jobConf) throws IOException {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public boolean deleteResource(JobConf jobConf) throws IOException {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public String getIdentifier() {
        return getOutputDirectory() + "/" + (this.mode == TapMode.SINK ? "sink" : "source") + "/" + (this.version == null ? "LATEST" : this.version.toString());
    }

    public long getModifiedTime(JobConf jobConf) throws IOException {
        VersionedStore store = getStore(jobConf);
        if (this.mode == TapMode.SINK) {
            return 0L;
        }
        return store.mostRecentVersion().longValue();
    }

    public boolean commitResource(JobConf jobConf) throws IOException {
        VersionedStore store = getStore(jobConf);
        if (this.newVersionPath == null) {
            return true;
        }
        store.succeedVersion(this.newVersionPath);
        markSuccessfulOutputDir(new Path(this.newVersionPath), jobConf);
        this.newVersionPath = null;
        store.cleanup(getVersionsToKeep());
        return true;
    }

    private static void markSuccessfulOutputDir(Path path, JobConf jobConf) throws IOException {
        FileSystem fileSystem = path.getFileSystem(jobConf);
        if (fileSystem.exists(path)) {
            fileSystem.create(new Path(path, VersionedStore.HADOOP_SUCCESS_FLAG)).close();
        }
    }

    public boolean rollbackResource(JobConf jobConf) throws IOException {
        if (this.newVersionPath == null) {
            return true;
        }
        getStore(jobConf).failVersion(this.newVersionPath);
        this.newVersionPath = null;
        return true;
    }

    public /* bridge */ /* synthetic */ void sinkConfInit(FlowProcess flowProcess, Object obj) {
        sinkConfInit((FlowProcess<JobConf>) flowProcess, (JobConf) obj);
    }

    public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Object obj) {
        sourceConfInit((FlowProcess<JobConf>) flowProcess, (JobConf) obj);
    }
}
