package co.cask.cdap.data.file;

import co.cask.cdap.proto.id.StreamId;
import co.cask.cdap.security.impersonation.Impersonator;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.collect.PeekingIterator;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/data/file/PartitionedFileWriter.class */
public abstract class PartitionedFileWriter<T, P> implements FileWriter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionedFileWriter.class);
    protected final StreamId streamId;
    private final PartitionedFileWriterFactory<T, P> fileWriterFactory;
    private final Impersonator impersonator;
    private final Map<P, FileWriter<T>> writers = Maps.newHashMap();
    private P currentPartition;
    private FileWriter<T> currentWriter;
    private boolean closed;

    /* loaded from: input_file:co/cask/cdap/data/file/PartitionedFileWriter$AppendIterator.class */
    private final class AppendIterator extends AbstractIterator<T> {
        private final PeekingIterator<? extends T> events;

        AppendIterator(PeekingIterator<? extends T> peekingIterator) {
            this.events = peekingIterator;
        }

        protected T computeNext() {
            if (!this.events.hasNext()) {
                return (T) endOfData();
            }
            T t = (T) this.events.peek();
            if (!Objects.equal(PartitionedFileWriter.this.currentPartition, PartitionedFileWriter.this.getPartition(t))) {
                return (T) endOfData();
            }
            this.events.next();
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:co/cask/cdap/data/file/PartitionedFileWriter$PartitionedFileWriterFactory.class */
    public interface PartitionedFileWriterFactory<T, P> {
        FileWriter<T> create(P p) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PartitionedFileWriter(PartitionedFileWriterFactory<T, P> partitionedFileWriterFactory, StreamId streamId, Impersonator impersonator) {
        this.fileWriterFactory = partitionedFileWriterFactory;
        this.streamId = streamId;
        this.impersonator = impersonator;
    }

    @Override // co.cask.cdap.data.file.FileWriter
    public void append(T t) throws IOException {
        if (this.closed) {
            throw new IOException("Attempts to write to a closed FileWriter.");
        }
        try {
            getWriter(t).append(t);
        } catch (Throwable th) {
            LOG.error("Exception on append.", th);
            Closeables.closeQuietly(this);
            Throwables.propagateIfInstanceOf(th, IOException.class);
            throw Throwables.propagate(th);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // co.cask.cdap.data.file.FileWriter
    public void appendAll(Iterator<? extends T> it) throws IOException {
        if (this.closed) {
            throw new IOException("Attempts to write to a closed FileWriter.");
        }
        PeekingIterator peekingIterator = Iterators.peekingIterator(it);
        while (peekingIterator.hasNext()) {
            getWriter(peekingIterator.peek()).appendAll(new AppendIterator(peekingIterator));
        }
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        if (this.writers.size() == 1) {
            this.currentWriter.flush();
            return;
        }
        if (this.writers.isEmpty()) {
            return;
        }
        IOException iOException = null;
        Iterator<FileWriter<T>> it = this.writers.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().flush();
            } catch (IOException e) {
                iOException = e;
            }
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            IOException iOException = null;
            Iterator<FileWriter<T>> it = this.writers.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().close();
                } catch (IOException e) {
                    iOException = e;
                }
            }
            if (iOException != null) {
                throw iOException;
            }
        } finally {
            this.closed = true;
        }
    }

    private FileWriter<T> getWriter(T t) throws IOException {
        final P partition = getPartition(t);
        if (!Objects.equal(this.currentPartition, partition)) {
            partitionChanged(this.currentPartition, partition);
            this.currentWriter = this.writers.get(partition);
            if (this.currentWriter == null) {
                try {
                    this.currentWriter = (FileWriter) this.impersonator.doAs(this.streamId, new Callable<FileWriter<T>>() { // from class: co.cask.cdap.data.file.PartitionedFileWriter.1
                        /* JADX WARN: Multi-variable type inference failed */
                        @Override // java.util.concurrent.Callable
                        public FileWriter<T> call() throws Exception {
                            return PartitionedFileWriter.this.fileWriterFactory.create(partition);
                        }
                    });
                    this.writers.put(partition, this.currentWriter);
                } catch (Exception e) {
                    Throwables.propagateIfPossible(e, IOException.class);
                    throw new IOException(e);
                }
            }
            this.currentPartition = partition;
        }
        return this.currentWriter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void closePartitionWriter(P p) throws IOException {
        FileWriter<T> remove = this.writers.remove(p);
        if (remove != null) {
            remove.close();
        }
    }

    protected void partitionChanged(P p, P p2) throws IOException {
    }

    protected abstract P getPartition(T t);
}
