/*
 * Decompiled with CFR 0.152.
 */
package org.apache.streams.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Queue;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.converter.LineReadWriteConfiguration;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.hdfs.HdfsWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebHdfsPersistWriter
implements StreamsPersistWriter,
Flushable,
Closeable,
DatumStatusCountable {
    public static final String STREAMS_ID = "WebHdfsPersistWriter";
    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
    private FileSystem client;
    private Path path;
    private int linesPerFile;
    private int totalRecordsWritten = 0;
    private final List<Path> writtenFiles = new ArrayList<Path>();
    private int fileLineCounter = 0;
    private OutputStreamWriter currentWriter = null;
    private static final int BYTES_IN_MB = 0x100000;
    private static final int BYTES_BEFORE_FLUSH = 0x4000000;
    private volatile int totalByteCount = 0;
    private volatile int byteCount = 0;
    public boolean terminate = false;
    protected volatile Queue<StreamsDatum> persistQueue;
    private ObjectMapper mapper;
    private LineReadWriteUtil lineWriterUtil;
    protected HdfsWriterConfiguration hdfsConfiguration;

    public WebHdfsPersistWriter() {
        this((HdfsWriterConfiguration)new ComponentConfigurator(HdfsWriterConfiguration.class).detectConfiguration());
    }

    public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
        this.hdfsConfiguration = hdfsConfiguration;
        this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue();
    }

    public URI getURI() throws URISyntaxException {
        StringBuilder uriBuilder = new StringBuilder();
        uriBuilder.append((Object)this.hdfsConfiguration.getScheme());
        uriBuilder.append("://");
        if (StringUtils.isNotEmpty((CharSequence)this.hdfsConfiguration.getHost())) {
            uriBuilder.append(this.hdfsConfiguration.getHost() + ":" + this.hdfsConfiguration.getPort());
        } else {
            uriBuilder.append("/");
        }
        return new URI(uriBuilder.toString());
    }

    public boolean isConnected() {
        return this.client != null;
    }

    public final synchronized FileSystem getFileSystem() {
        if (!this.isConnected()) {
            this.connectToWebHDFS();
        }
        return this.client;
    }

    private synchronized void connectToWebHDFS() {
        try {
            LOGGER.info("User : {}", (Object)this.hdfsConfiguration.getUser());
            UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)this.hdfsConfiguration.getUser());
            ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
            ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                @Override
                public Void run() throws Exception {
                    Configuration conf = new Configuration();
                    conf.set("hadoop.security.authentication", "kerberos");
                    LOGGER.info("WebURI : {}", (Object)WebHdfsPersistWriter.this.getURI().toString());
                    WebHdfsPersistWriter.this.client = FileSystem.get((URI)WebHdfsPersistWriter.this.getURI(), (Configuration)conf);
                    LOGGER.info("Connected to WebHDFS");
                    return null;
                }
            });
        }
        catch (Exception ex) {
            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", (Throwable)ex);
            throw new RuntimeException(ex);
        }
    }

    public String getId() {
        return STREAMS_ID;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(StreamsDatum streamsDatum) {
        WebHdfsPersistWriter webHdfsPersistWriter = this;
        synchronized (webHdfsPersistWriter) {
            if (this.currentWriter == null || this.fileLineCounter > this.linesPerFile) {
                this.resetFile();
            }
            String line = this.lineWriterUtil.convertResultToString(streamsDatum);
            this.writeInternal(line);
            if (!line.endsWith(this.hdfsConfiguration.getLineDelimiter())) {
                this.writeInternal(this.hdfsConfiguration.getLineDelimiter());
            }
            int bytesInLine = line.getBytes().length;
            ++this.totalRecordsWritten;
            this.totalByteCount += bytesInLine;
            this.byteCount += bytesInLine;
            if (this.byteCount > 0x4000000) {
                try {
                    this.flush();
                }
                catch (IOException ex) {
                    LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution.  WARNING: There could be data loss.", (Throwable)ex);
                }
            }
            ++this.fileLineCounter;
        }
    }

    private void writeInternal(String line) {
        try {
            this.currentWriter.write(line);
        }
        catch (IOException ex) {
            LOGGER.warn("Error writing to HDFS.  Attempting to try a new file", (Throwable)ex);
            try {
                this.resetFile();
                this.currentWriter.write(line);
            }
            catch (Exception e2) {
                LOGGER.warn("Failed to write even after creating a new file.  Attempting to reconnect", (Throwable)e2);
                try {
                    this.connectToWebHDFS();
                    this.resetFile();
                    this.currentWriter.write(line);
                }
                catch (Exception e3) {
                    LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", (Throwable)e3);
                    throw new RuntimeException(e3);
                }
            }
        }
    }

    @Override
    public void flush() throws IOException {
        if (this.currentWriter != null && this.byteCount > 0x4000000) {
            this.currentWriter.flush();
            this.byteCount = 0;
        }
    }

    private synchronized void resetFile() {
        if (this.fileLineCounter == 0 && this.currentWriter != null) {
            return;
        }
        Path filePath = this.path.suffix("/" + this.hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime());
        filePath = this.hdfsConfiguration.getCompression().equals((Object)HdfsWriterConfiguration.Compression.GZIP) ? filePath.suffix(".gz") : filePath.suffix(".tsv");
        try {
            if (this.currentWriter != null) {
                this.flush();
                this.close();
            }
            this.fileLineCounter = 0;
            if (this.client.exists(filePath)) {
                throw new RuntimeException("Unable to create file: " + filePath);
            }
            this.currentWriter = this.hdfsConfiguration.getCompression().equals((Object)HdfsWriterConfiguration.Compression.GZIP) ? new OutputStreamWriter(new GZIPOutputStream((OutputStream)this.client.create(filePath))) : new OutputStreamWriter((OutputStream)this.client.create(filePath));
            this.writtenFiles.add(filePath);
            LOGGER.info("File Created: {}", (Object)filePath);
        }
        catch (Exception ex) {
            LOGGER.error("COULD NOT CreateFile: {}", (Object)filePath);
            LOGGER.error(ex.getMessage());
            throw new RuntimeException(ex);
        }
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.currentWriter != null) {
            this.currentWriter.flush();
            this.currentWriter.close();
            this.currentWriter = null;
            LOGGER.info("File Closed");
        }
    }

    public void prepare(Object configurationObject) {
        this.mapper = StreamsJacksonMapper.getInstance();
        this.lineWriterUtil = LineReadWriteUtil.getInstance((LineReadWriteConfiguration)this.hdfsConfiguration);
        this.connectToWebHDFS();
        this.path = new Path(this.hdfsConfiguration.getPath() + "/" + this.hdfsConfiguration.getWriterPath());
    }

    public void cleanUp() {
        try {
            this.flush();
        }
        catch (IOException ex) {
            LOGGER.error("Error flushing on cleanup", (Throwable)ex);
        }
        try {
            this.close();
        }
        catch (IOException ex) {
            LOGGER.error("Error closing on cleanup", (Throwable)ex);
        }
    }

    public DatumStatusCounter getDatumStatusCounter() {
        DatumStatusCounter counters = new DatumStatusCounter();
        counters.incrementAttempt(this.totalRecordsWritten);
        counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten);
        return counters;
    }
}

