package org.apache.streams.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Queue;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.hdfs.HdfsWriterConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/hdfs/WebHdfsPersistWriter.class */
public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
    public static final String STREAMS_ID = "WebHdfsPersistWriter";
    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
    private FileSystem client;
    private Path path;
    private int linesPerFile;
    private int totalRecordsWritten;
    private final List<Path> writtenFiles;
    private int fileLineCounter;
    private OutputStreamWriter currentWriter;
    private static final int BYTES_IN_MB = 1048576;
    private static final int BYTES_BEFORE_FLUSH = 67108864;
    private volatile int totalByteCount;
    private volatile int byteCount;
    public boolean terminate;
    protected volatile Queue<StreamsDatum> persistQueue;
    private ObjectMapper mapper;
    private LineReadWriteUtil lineWriterUtil;
    protected HdfsWriterConfiguration hdfsConfiguration;

    public WebHdfsPersistWriter() {
        this((HdfsWriterConfiguration) new ComponentConfigurator(HdfsWriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
    }

    public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsWriterConfiguration) {
        this.totalRecordsWritten = 0;
        this.writtenFiles = new ArrayList();
        this.fileLineCounter = 0;
        this.currentWriter = null;
        this.totalByteCount = 0;
        this.byteCount = 0;
        this.terminate = false;
        this.hdfsConfiguration = hdfsWriterConfiguration;
        this.linesPerFile = hdfsWriterConfiguration.getLinesPerFile().intValue();
    }

    public URI getURI() throws URISyntaxException {
        StringBuilder sb = new StringBuilder();
        sb.append(this.hdfsConfiguration.getScheme());
        sb.append("://");
        if (StringUtils.isNotEmpty(this.hdfsConfiguration.getHost())) {
            sb.append(this.hdfsConfiguration.getHost() + ":" + this.hdfsConfiguration.getPort());
        } else {
            sb.append("/");
        }
        return new URI(sb.toString());
    }

    public boolean isConnected() {
        return this.client != null;
    }

    public final synchronized FileSystem getFileSystem() {
        if (!isConnected()) {
            connectToWebHDFS();
        }
        return this.client;
    }

    private synchronized void connectToWebHDFS() {
        try {
            LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
            UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
            createRemoteUser.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
            createRemoteUser.doAs(new PrivilegedExceptionAction<Void>() { // from class: org.apache.streams.hdfs.WebHdfsPersistWriter.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.security.PrivilegedExceptionAction
                public Void run() throws Exception {
                    Configuration configuration = new Configuration();
                    configuration.set("hadoop.security.authentication", "kerberos");
                    WebHdfsPersistWriter.LOGGER.info("WebURI : {}", WebHdfsPersistWriter.this.getURI().toString());
                    WebHdfsPersistWriter.this.client = FileSystem.get(WebHdfsPersistWriter.this.getURI(), configuration);
                    WebHdfsPersistWriter.LOGGER.info("Connected to WebHDFS");
                    return null;
                }
            });
        } catch (Exception e) {
            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", e);
            throw new RuntimeException(e);
        }
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void write(StreamsDatum streamsDatum) {
        synchronized (this) {
            if (this.currentWriter == null || this.fileLineCounter > this.linesPerFile) {
                resetFile();
            }
            String convertResultToString = this.lineWriterUtil.convertResultToString(streamsDatum);
            writeInternal(convertResultToString);
            if (!convertResultToString.endsWith(this.hdfsConfiguration.getLineDelimiter())) {
                writeInternal(this.hdfsConfiguration.getLineDelimiter());
            }
            int length = convertResultToString.getBytes().length;
            this.totalRecordsWritten++;
            this.totalByteCount += length;
            this.byteCount += length;
            if (this.byteCount > BYTES_BEFORE_FLUSH) {
                try {
                    flush();
                } catch (IOException e) {
                    LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution.  WARNING: There could be data loss.", e);
                }
            }
            this.fileLineCounter++;
        }
    }

    private void writeInternal(String str) {
        try {
            this.currentWriter.write(str);
        } catch (IOException e) {
            LOGGER.warn("Error writing to HDFS.  Attempting to try a new file", e);
            try {
                resetFile();
                this.currentWriter.write(str);
            } catch (Exception e2) {
                LOGGER.warn("Failed to write even after creating a new file.  Attempting to reconnect", e2);
                try {
                    connectToWebHDFS();
                    resetFile();
                    this.currentWriter.write(str);
                } catch (Exception e3) {
                    LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", e3);
                    throw new RuntimeException(e3);
                }
            }
        }
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
        if (this.currentWriter == null || this.byteCount <= BYTES_BEFORE_FLUSH) {
            return;
        }
        this.currentWriter.flush();
        this.byteCount = 0;
    }

    private synchronized void resetFile() {
        if (this.fileLineCounter != 0 || this.currentWriter == null) {
            Path suffix = this.path.suffix("/" + this.hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime());
            Path suffix2 = this.hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP) ? suffix.suffix(".gz") : suffix.suffix(".tsv");
            try {
                if (this.currentWriter != null) {
                    flush();
                    close();
                }
                this.fileLineCounter = 0;
                if (this.client.exists(suffix2)) {
                    throw new RuntimeException("Unable to create file: " + suffix2);
                }
                if (this.hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) {
                    this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(this.client.create(suffix2)));
                } else {
                    this.currentWriter = new OutputStreamWriter(this.client.create(suffix2));
                }
                this.writtenFiles.add(suffix2);
                LOGGER.info("File Created: {}", suffix2);
            } catch (Exception e) {
                LOGGER.error("COULD NOT CreateFile: {}", suffix2);
                LOGGER.error(e.getMessage());
                throw new RuntimeException(e);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        if (this.currentWriter != null) {
            this.currentWriter.flush();
            this.currentWriter.close();
            this.currentWriter = null;
            LOGGER.info("File Closed");
        }
    }

    public void prepare(Object obj) {
        this.mapper = StreamsJacksonMapper.getInstance();
        this.lineWriterUtil = LineReadWriteUtil.getInstance(this.hdfsConfiguration);
        connectToWebHDFS();
        this.path = new Path(this.hdfsConfiguration.getPath() + "/" + this.hdfsConfiguration.getWriterPath());
    }

    public void cleanUp() {
        try {
            flush();
        } catch (IOException e) {
            LOGGER.error("Error flushing on cleanup", e);
        }
        try {
            close();
        } catch (IOException e2) {
            LOGGER.error("Error closing on cleanup", e2);
        }
    }

    public DatumStatusCounter getDatumStatusCounter() {
        DatumStatusCounter datumStatusCounter = new DatumStatusCounter();
        datumStatusCounter.incrementAttempt(this.totalRecordsWritten);
        datumStatusCounter.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten);
        return datumStatusCounter;
    }
}
