package co.cask.cdap.data.stream;

import co.cask.cdap.data.file.FileReader;
import co.cask.cdap.data.file.PositionReporter;
import co.cask.cdap.data.file.ReadFilter;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import it.unimi.dsi.fastutil.PriorityQueue;
import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/data/stream/MultiLiveStreamFileReader.class */
public final class MultiLiveStreamFileReader implements FileReader<StreamEventOffset, Iterable<StreamFileOffset>> {
    private final PriorityQueue<StreamEventSource> eventSources;
    private final Set<StreamEventSource> emptySources;
    private final Set<StreamEventSource> allSources = Sets.newTreeSet();
    private final Iterable<StreamFileOffset> offsetsView;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/data/stream/MultiLiveStreamFileReader$StreamEventSource.class */
    public static final class StreamEventSource implements Comparable<StreamEventSource>, Closeable, PositionReporter<StreamFileOffset> {
        private final FileReader<PositionStreamEvent, StreamFileOffset> reader;
        private final List<PositionStreamEvent> events;
        private StreamFileOffset currentOffset;
        private StreamFileOffset nextOffset;

        private StreamEventSource(StreamConfig streamConfig, StreamFileOffset streamFileOffset) {
            this.reader = new LiveStreamFileReader(streamConfig, streamFileOffset);
            this.events = Lists.newArrayListWithCapacity(1);
            this.currentOffset = new StreamFileOffset(streamFileOffset);
            this.nextOffset = streamFileOffset;
        }

        void initialize() throws IOException {
            this.reader.initialize();
            this.currentOffset = this.reader.getPosition();
        }

        void read(Collection<? super StreamEventOffset> collection) throws IOException, InterruptedException {
            PositionStreamEvent positionStreamEvent = this.events.get(0);
            collection.add(new StreamEventOffset(positionStreamEvent, new StreamFileOffset(this.nextOffset.getEventLocation(), positionStreamEvent.getStart(), this.nextOffset.getGeneration())));
            this.events.clear();
            this.currentOffset = this.nextOffset;
        }

        int prepare(ReadFilter readFilter) throws IOException, InterruptedException {
            if (!this.events.isEmpty()) {
                return 1;
            }
            int read = this.reader.read(this.events, 1, 0L, TimeUnit.MILLISECONDS, readFilter);
            this.nextOffset = this.reader.getPosition();
            return read;
        }

        @Override // java.lang.Comparable
        public int compareTo(StreamEventSource streamEventSource) {
            if (this == streamEventSource) {
                return 0;
            }
            int compare = Longs.compare(this.events.isEmpty() ? 0L : this.events.get(0).getTimestamp(), streamEventSource.events.isEmpty() ? 0L : streamEventSource.events.get(0).getTimestamp());
            return compare != 0 ? compare : getPosition().getEventLocation().toURI().compareTo(streamEventSource.getPosition().getEventLocation().toURI());
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.reader.close();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // co.cask.cdap.data.file.PositionReporter
        public StreamFileOffset getPosition() {
            return this.currentOffset;
        }
    }

    public MultiLiveStreamFileReader(StreamConfig streamConfig, Iterable<? extends StreamFileOffset> iterable) {
        Iterator<? extends StreamFileOffset> it = iterable.iterator();
        while (it.hasNext()) {
            this.allSources.add(new StreamEventSource(streamConfig, it.next()));
        }
        this.eventSources = new ObjectHeapPriorityQueue(this.allSources.size());
        this.emptySources = Sets.newHashSet(this.allSources);
        this.offsetsView = Iterables.transform(this.allSources, new Function<StreamEventSource, StreamFileOffset>() { // from class: co.cask.cdap.data.stream.MultiLiveStreamFileReader.1
            public StreamFileOffset apply(StreamEventSource streamEventSource) {
                return streamEventSource.getPosition();
            }
        });
    }

    @Override // co.cask.cdap.data.file.FileReader
    public void initialize() throws IOException {
        Iterator<StreamEventSource> it = this.allSources.iterator();
        while (it.hasNext()) {
            it.next().initialize();
        }
    }

    @Override // co.cask.cdap.data.file.FileReader
    public int read(Collection<? super StreamEventOffset> collection, int i, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
        return read(collection, i, j, timeUnit, ReadFilter.ALWAYS_ACCEPT);
    }

    @Override // co.cask.cdap.data.file.FileReader
    public int read(Collection<? super StreamEventOffset> collection, int i, long j, TimeUnit timeUnit, ReadFilter readFilter) throws IOException, InterruptedException {
        int i2 = 0;
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.start();
        while (i2 < i && (!this.emptySources.isEmpty() || !this.eventSources.isEmpty())) {
            if (!this.emptySources.isEmpty()) {
                prepareEmptySources(readFilter);
            }
            i2 += read(collection, readFilter);
            if (this.eventSources.isEmpty() && stopwatch.elapsedTime(timeUnit) > j) {
                break;
            }
        }
        if (i2 == 0 && this.emptySources.isEmpty() && this.eventSources.isEmpty()) {
            return -1;
        }
        return i2;
    }

    @Override // co.cask.cdap.data.file.PositionReporter
    public Iterable<StreamFileOffset> getPosition() {
        return this.offsetsView;
    }

    private void prepareEmptySources(ReadFilter readFilter) throws IOException, InterruptedException {
        Iterator<StreamEventSource> it = this.emptySources.iterator();
        while (it.hasNext()) {
            StreamEventSource next = it.next();
            int prepare = next.prepare(readFilter);
            if (prepare != 0) {
                it.remove();
                if (prepare > 0) {
                    this.eventSources.enqueue(next);
                }
            }
        }
    }

    private int read(Collection<? super StreamEventOffset> collection, ReadFilter readFilter) throws IOException, InterruptedException {
        if (this.eventSources.isEmpty()) {
            return 0;
        }
        StreamEventSource streamEventSource = (StreamEventSource) this.eventSources.first();
        streamEventSource.read(collection);
        int prepare = streamEventSource.prepare(readFilter);
        if (prepare > 0) {
            this.eventSources.changed();
            return 1;
        }
        if (prepare > 0) {
            return 1;
        }
        this.eventSources.dequeue();
        if (prepare != 0) {
            return 1;
        }
        this.emptySources.add(streamEventSource);
        return 1;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Iterator<StreamEventSource> it = this.allSources.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.emptySources.clear();
        this.eventSources.clear();
    }
}
