package co.cask.cdap.data.stream;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.proto.Id;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/data/stream/StreamFileJanitor.class */
public final class StreamFileJanitor {
    private static final Logger LOG = LoggerFactory.getLogger(StreamFileJanitor.class);
    private final StreamAdmin streamAdmin;
    private final NamespacedLocationFactory namespacedLocationFactory;
    private final String streamBaseDirPath;

    @Inject
    public StreamFileJanitor(CConfiguration cConfiguration, StreamAdmin streamAdmin, NamespacedLocationFactory namespacedLocationFactory) {
        this.streamAdmin = streamAdmin;
        this.streamBaseDirPath = cConfiguration.get("stream.base.dir");
        this.namespacedLocationFactory = namespacedLocationFactory;
    }

    public void cleanAll() throws IOException {
        Map list = this.namespacedLocationFactory.list();
        if (list.size() == 0) {
            return;
        }
        Iterator it = list.values().iterator();
        while (it.hasNext()) {
            Location append = ((Location) it.next()).append(this.streamBaseDirPath);
            if (append.exists()) {
                for (Location location : append.list()) {
                    Id.Stream streamIdFromLocation = StreamUtils.getStreamIdFromLocation(location);
                    long j = 0;
                    if (isStreamExists(streamIdFromLocation)) {
                        j = this.streamAdmin.getConfig(streamIdFromLocation).getTTL();
                    }
                    clean(location, j, System.currentTimeMillis());
                }
            }
        }
    }

    @VisibleForTesting
    void clean(Location location, long j, long j2) throws IOException {
        LOG.debug("Cleanup stream file in {}", location);
        int generation = StreamUtils.getGeneration(location);
        for (int i = 0; i < generation; i++) {
            Location createGenerationLocation = StreamUtils.createGenerationLocation(location, i);
            if (createGenerationLocation.equals(location)) {
                for (Location location2 : createGenerationLocation.list()) {
                    if (isPartitionDirector(location2)) {
                        location2.delete(true);
                    }
                }
            } else {
                createGenerationLocation.delete(true);
            }
        }
        long j3 = j2 - j;
        for (Location location3 : StreamUtils.createGenerationLocation(location, generation).list()) {
            if (isPartitionDirector(location3) && StreamUtils.getPartitionEndTime(location3.getName()) < j3) {
                location3.delete(true);
            }
        }
    }

    private boolean isPartitionDirector(Location location) throws IOException {
        return location.isDirectory() && location.getName().indexOf(46) > 0;
    }

    private boolean isStreamExists(Id.Stream stream) throws IOException {
        try {
            return this.streamAdmin.exists(stream);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }
}
