package org.kitesdk.data.spi.filesystem;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.jboss.netty.handler.codec.http.multipart.DiskFileUpload;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetWriterException;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.spi.AbstractDatasetWriter;
import org.kitesdk.data.spi.DescriptorUtil;
import org.kitesdk.data.spi.ReaderWriterState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/kite-data-core-0.17.0.jar:org/kitesdk/data/spi/filesystem/FileSystemWriter.class */
public class FileSystemWriter<E> extends AbstractDatasetWriter<E> {
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemWriter.class);
    protected final FileSystem fs;
    private final Path directory;
    private final DatasetDescriptor descriptor;
    private FileAppender<E> appender;
    private Path tempPath;
    private Path finalPath;
    private ReaderWriterState state;
    private int count = 0;

    @VisibleForTesting
    final Configuration conf;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/kite-data-core-0.17.0.jar:org/kitesdk/data/spi/filesystem/FileSystemWriter$FileAppender.class */
    public interface FileAppender<E> extends Flushable, Closeable {
        void open() throws IOException;

        void append(E e) throws IOException;

        void sync() throws IOException;

        void cleanup() throws IOException;
    }

    /* loaded from: input_file:WEB-INF/lib/kite-data-core-0.17.0.jar:org/kitesdk/data/spi/filesystem/FileSystemWriter$KiteRecordWriter.class */
    private class KiteRecordWriter extends RecordWriter<E, Void> {
        private KiteRecordWriter() {
        }

        public void write(E e, Void r5) throws IOException, InterruptedException {
            FileSystemWriter.this.write(e);
        }

        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            FileSystemWriter.this.close();
        }

        public /* bridge */ /* synthetic */ void write(Object obj, Object obj2) throws IOException, InterruptedException {
            write((KiteRecordWriter) obj, (Void) obj2);
        }
    }

    public FileSystemWriter(FileSystem fileSystem, Path path, DatasetDescriptor datasetDescriptor) {
        Preconditions.checkNotNull(fileSystem, "File system is not defined");
        Preconditions.checkNotNull(path, "Destination directory is not defined");
        Preconditions.checkNotNull(datasetDescriptor, "Descriptor is not defined");
        this.fs = fileSystem;
        this.directory = path;
        this.descriptor = datasetDescriptor;
        this.conf = new Configuration(fileSystem.getConf());
        this.state = ReaderWriterState.NEW;
        for (String str : datasetDescriptor.listProperties()) {
            this.conf.set(str, datasetDescriptor.getProperty(str));
        }
    }

    @Override // org.kitesdk.data.spi.InitializeAccessor
    public final void initialize() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.NEW), "Unable to open a writer from state:%s", this.state);
        try {
            this.fs.mkdirs(this.directory);
            this.finalPath = new Path(this.directory, uniqueFilename(this.descriptor.getFormat()));
            this.tempPath = tempFilename(this.finalPath);
            this.appender = newAppender(this.tempPath);
            try {
                this.appender.open();
                this.count = 0;
                LOG.debug("Opened appender {} for {}", this.appender, this.finalPath);
                this.state = ReaderWriterState.OPEN;
            } catch (IOException e) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetIOException("Failed to open appender " + this.appender, e);
            }
        } catch (IOException e2) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to create path " + this.directory, e2);
        }
    }

    @Override // org.kitesdk.data.DatasetWriter
    public final void write(E e) {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to write to a writer in state:%s", this.state);
        try {
            this.appender.append(e);
            this.count++;
        } catch (IOException e2) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to append " + e + " to " + this.appender, e2);
        }
    }

    @Override // org.kitesdk.data.DatasetWriter, java.io.Flushable
    public void flush() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to flush a writer in state:%s", this.state);
        try {
            this.appender.flush();
        } catch (IOException e) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetWriterException("Failed to flush appender " + this.appender);
        }
    }

    @Override // org.kitesdk.data.DatasetWriter
    public void sync() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to sync a writer in state:%s", this.state);
        try {
            this.appender.sync();
        } catch (IOException e) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to sync appender " + this.appender, e);
        }
    }

    @Override // org.kitesdk.data.DatasetWriter, java.io.Closeable, java.lang.AutoCloseable
    public final void close() {
        if (!this.state.equals(ReaderWriterState.OPEN)) {
            if (this.state.equals(ReaderWriterState.ERROR)) {
                this.state = ReaderWriterState.CLOSED;
                return;
            }
            return;
        }
        try {
            this.appender.close();
            if (this.count > 0) {
                try {
                    if (!this.fs.rename(this.tempPath, this.finalPath)) {
                        this.state = ReaderWriterState.ERROR;
                        throw new DatasetWriterException("Failed to move " + this.tempPath + " to " + this.finalPath);
                    }
                    LOG.debug("Committed {} for appender {} ({} entities)", new Object[]{this.finalPath, this.appender, Integer.valueOf(this.count)});
                } catch (IOException e) {
                    this.state = ReaderWriterState.ERROR;
                    throw new DatasetIOException("Failed to commit " + this.finalPath, e);
                }
            } else {
                try {
                    if (!this.fs.delete(this.tempPath, true)) {
                        this.state = ReaderWriterState.ERROR;
                        throw new DatasetWriterException("Failed to delete " + this.tempPath);
                    }
                    LOG.debug("Discarded {} ({} entities)", this.tempPath, Integer.valueOf(this.count));
                } catch (IOException e2) {
                    this.state = ReaderWriterState.ERROR;
                    throw new DatasetIOException("Failed to remove temporary file " + this.tempPath, e2);
                }
            }
            try {
                this.appender.cleanup();
                this.state = ReaderWriterState.CLOSED;
            } catch (IOException e3) {
                throw new DatasetIOException("Failed to clean up " + this.appender, e3);
            }
        } catch (IOException e4) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to close appender " + this.appender, e4);
        }
    }

    @Override // org.kitesdk.data.DatasetWriter
    public final boolean isOpen() {
        return this.state.equals(ReaderWriterState.OPEN);
    }

    private static String uniqueFilename(Format format) {
        return UUID.randomUUID() + "." + format.getExtension();
    }

    private static Path tempFilename(Path path) {
        return new Path(path.getParent(), "." + path.getName() + DiskFileUpload.postfix);
    }

    @VisibleForTesting
    <E> FileAppender<E> newAppender(Path path) {
        Format format = this.descriptor.getFormat();
        if (Formats.PARQUET.equals(format)) {
            return DescriptorUtil.isDisabled(FileSystemProperties.NON_DURABLE_PARQUET_PROP, this.descriptor) ? new DurableParquetAppender(this.fs, path, this.descriptor.getSchema(), this.conf, this.descriptor.getCompressionType()) : new ParquetAppender(this.fs, path, this.descriptor.getSchema(), this.conf, this.descriptor.getCompressionType());
        }
        if (Formats.AVRO.equals(format)) {
            return new AvroAppender(this.fs, path, this.descriptor.getSchema(), this.descriptor.getCompressionType());
        }
        if (Formats.CSV.equals(format) && DescriptorUtil.isEnabled("kite.allow.csv", this.descriptor)) {
            return new CSVAppender(this.fs, path, this.descriptor);
        }
        this.state = ReaderWriterState.ERROR;
        throw new DatasetWriterException("Unknown format " + this.descriptor);
    }

    public RecordWriter<E, Void> asRecordWriter() {
        return new KiteRecordWriter();
    }
}
