package akka.stream.alpakka.file.impl;

import akka.NotUsed;
import akka.annotation.InternalApi;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.Shape;
import akka.stream.SourceShape;
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.javadsl.Source;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.TimerGraphStageLogic;
import com.sun.nio.file.SensitivityWatchEventModifier;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.BiFunction;
import scala.concurrent.duration.FiniteDuration;

@InternalApi
/* loaded from: input_file:akka/stream/alpakka/file/impl/DirectoryChangesSource.class */
public final class DirectoryChangesSource<T> extends GraphStage<SourceShape<T>> {
    private static final Attributes DEFAULT_ATTRIBUTES = Attributes.name("DirectoryChangesSource");
    private final Path directoryPath;
    private final FiniteDuration pollInterval;
    private final int maxBufferSize;
    private final BiFunction<Path, DirectoryChange, T> combiner;
    public final Outlet<T> out = Outlet.create("DirectoryChangesSource.out");
    private final SourceShape<T> shape = SourceShape.of(this.out);

    /* renamed from: akka.stream.alpakka.file.impl.DirectoryChangesSource$1, reason: invalid class name */
    /* loaded from: input_file:akka/stream/alpakka/file/impl/DirectoryChangesSource$1.class */
    class AnonymousClass1 extends TimerGraphStageLogic {
        private final Queue<T> buffer;
        private final WatchService service;
        private final WatchKey watchKey;

        AnonymousClass1(Shape shape) throws IOException {
            super(shape);
            this.buffer = new ArrayDeque();
            this.service = DirectoryChangesSource.this.directoryPath.getFileSystem().newWatchService();
            this.watchKey = DirectoryChangesSource.this.directoryPath.register(this.service, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.OVERFLOW}, SensitivityWatchEventModifier.HIGH);
            setHandler(DirectoryChangesSource.this.out, new AbstractOutHandler() { // from class: akka.stream.alpakka.file.impl.DirectoryChangesSource.1.1
                public void onPull() throws Exception {
                    if (!AnonymousClass1.this.buffer.isEmpty()) {
                        AnonymousClass1.this.pushHead();
                        return;
                    }
                    AnonymousClass1.this.doPoll();
                    if (AnonymousClass1.this.buffer.isEmpty()) {
                        AnonymousClass1.this.schedulePoll();
                    } else {
                        AnonymousClass1.this.pushHead();
                    }
                }
            });
        }

        public void onTimer(Object obj) {
            if (isClosed(DirectoryChangesSource.this.out)) {
                return;
            }
            doPoll();
            if (this.buffer.isEmpty()) {
                schedulePoll();
            } else {
                pushHead();
            }
        }

        public void postStop() {
            try {
                if (this.watchKey.isValid()) {
                    this.watchKey.cancel();
                }
                this.service.close();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private void pushHead() {
            T poll = this.buffer.poll();
            if (poll != null) {
                push(DirectoryChangesSource.this.out, poll);
            }
        }

        private void schedulePoll() {
            scheduleOnce("poll", DirectoryChangesSource.this.pollInterval);
        }

        private void doPoll() {
            try {
                for (WatchEvent<?> watchEvent : this.watchKey.pollEvents()) {
                    WatchEvent.Kind<?> kind = watchEvent.kind();
                    if (StandardWatchEventKinds.OVERFLOW.equals(kind)) {
                        failStage(new RuntimeException("Overflow from watch service: '" + DirectoryChangesSource.this.directoryPath + "'"));
                    } else {
                        Path path = (Path) watchEvent.context();
                        this.buffer.add(DirectoryChangesSource.this.combiner.apply(DirectoryChangesSource.this.directoryPath.resolve(path), kindToChange(kind)));
                        if (this.buffer.size() > DirectoryChangesSource.this.maxBufferSize) {
                            failStage(new RuntimeException("Max event buffer size " + DirectoryChangesSource.this.maxBufferSize + " reached for " + path));
                        }
                    }
                }
            } finally {
                if (!this.watchKey.reset()) {
                    completeStage();
                }
            }
        }

        private DirectoryChange kindToChange(WatchEvent.Kind<?> kind) {
            DirectoryChange directoryChange;
            if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) {
                directoryChange = DirectoryChange.Creation;
            } else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE)) {
                directoryChange = DirectoryChange.Deletion;
            } else {
                if (!kind.equals(StandardWatchEventKinds.ENTRY_MODIFY)) {
                    throw new RuntimeException("Unexpected kind of event gotten from watch service for path '" + DirectoryChangesSource.this.directoryPath + "': " + kind);
                }
                directoryChange = DirectoryChange.Modification;
            }
            return directoryChange;
        }
    }

    public DirectoryChangesSource(Path path, FiniteDuration finiteDuration, int i, BiFunction<Path, DirectoryChange, T> biFunction) {
        this.directoryPath = path;
        this.pollInterval = finiteDuration;
        this.maxBufferSize = i;
        this.combiner = biFunction;
    }

    /* renamed from: shape, reason: merged with bridge method [inline-methods] */
    public SourceShape<T> m5shape() {
        return this.shape;
    }

    public Attributes initialAttributes() {
        return DEFAULT_ATTRIBUTES;
    }

    public GraphStageLogic createLogic(Attributes attributes) throws IOException {
        if (!Files.exists(this.directoryPath, new LinkOption[0])) {
            throw new IllegalArgumentException("The path: '" + this.directoryPath + "' does not exist");
        }
        if (Files.isDirectory(this.directoryPath, new LinkOption[0])) {
            return new AnonymousClass1(this.shape);
        }
        throw new IllegalArgumentException("The path '" + this.directoryPath + "' is not a directory");
    }

    public String toString() {
        return "DirectoryChangesSource(" + this.directoryPath + ")";
    }

    public static Source<Pair<Path, DirectoryChange>, NotUsed> create(Path path, FiniteDuration finiteDuration, int i) {
        return Source.fromGraph(new DirectoryChangesSource(path, finiteDuration, i, (v0, v1) -> {
            return Pair.apply(v0, v1);
        }));
    }
}
