package com.datatorrent.stram.util;

import com.datatorrent.stram.client.StramAgent;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/util/FSPartFileCollection.class */
public class FSPartFileCollection {
    private transient FileSystem fs;
    private transient FSDataOutputStream partOutStr;
    private transient FSDataOutputStream indexOutStr;
    private transient FSDataOutputStream metaOs;
    private transient String localBasePath;
    public static final String INDEX_FILE = "index.txt";
    public static final String META_FILE = "meta.txt";
    protected String hdfsFile;
    private static final Logger logger = LoggerFactory.getLogger(FSPartFileCollection.class);
    protected int bytesPerPartFile = 1048576;
    protected long millisPerPartFile = StramAgent.SecurityInfo.DEFAULT_EXPIRY_INTERVAL;
    protected int fileParts = 0;
    protected int partFileItemCount = 0;
    protected int partFileBytes = 0;
    protected long currentPartFileTimeStamp = 0;
    protected String basePath = LogicalPlanConfiguration.KEY_SEPARATOR;
    private boolean isLocalMode = false;
    private boolean syncRequested = false;

    public void setBytesPerPartFile(int i) {
        this.bytesPerPartFile = i;
    }

    public void setMillisPerPartFile(long j) {
        this.millisPerPartFile = j;
    }

    public void setLocalMode(boolean z) {
        this.isLocalMode = z;
    }

    public void setBasePath(String str) {
        this.basePath = str;
    }

    public String getBasePath() {
        return this.basePath;
    }

    public void setup() throws IOException {
        if (this.basePath.startsWith("file:")) {
            this.isLocalMode = true;
            this.localBasePath = this.basePath.substring(5);
            new File(this.localBasePath).mkdirs();
        }
        this.fs = FileSystem.newInstance(new Path(this.basePath).toUri(), new Configuration());
        Path path = new Path(this.basePath, META_FILE);
        if (this.isLocalMode) {
            this.metaOs = new FSDataOutputStream(new FileOutputStream(this.localBasePath + "/" + META_FILE), (FileSystem.Statistics) null);
        } else {
            this.metaOs = this.fs.create(path);
        }
        Path path2 = new Path(this.basePath, INDEX_FILE);
        if (this.isLocalMode) {
            this.indexOutStr = new FSDataOutputStream(new FileOutputStream(this.localBasePath + "/" + INDEX_FILE), (FileSystem.Statistics) null);
        } else {
            this.indexOutStr = this.fs.create(path2);
        }
    }

    public void teardown() {
        logger.info("Closing hdfs part collection.");
        try {
            if (this.metaOs != null) {
                this.metaOs.close();
            }
            if (this.partOutStr != null) {
                logger.debug("Closing part file");
                this.partOutStr.close();
                if (this.indexOutStr != null) {
                    writeIndex();
                }
            }
            if (this.indexOutStr != null) {
                writeIndexEnd();
                this.indexOutStr.close();
            }
            this.fs.close();
        } catch (IOException e) {
            logger.error(e.toString());
        }
    }

    private void openNewPartFile() throws IOException {
        this.hdfsFile = "part" + this.fileParts + ".txt";
        Path path = new Path(this.basePath, this.hdfsFile);
        logger.debug("Opening new part file: {}", this.hdfsFile);
        if (this.isLocalMode) {
            this.partOutStr = new FSDataOutputStream(new FileOutputStream(this.localBasePath + "/" + this.hdfsFile), (FileSystem.Statistics) null);
        } else {
            this.partOutStr = this.fs.create(path);
        }
        this.fileParts++;
        this.currentPartFileTimeStamp = System.currentTimeMillis();
        this.partFileItemCount = 0;
        this.partFileBytes = 0;
    }

    public void writeMetaData(byte[] bArr) throws IOException {
        this.metaOs.write(bArr);
        this.metaOs.hflush();
    }

    public void writeDataItem(byte[] bArr, boolean z) throws IOException {
        if (this.partOutStr == null) {
            openNewPartFile();
        }
        this.partOutStr.write(bArr);
        this.partFileBytes += bArr.length;
        if (z) {
            this.partFileItemCount++;
        }
    }

    public void requestSync() {
        this.syncRequested = true;
    }

    public boolean isReadyTurnoverPartFile() {
        try {
            if (this.syncRequested || this.partOutStr.getPos() > this.bytesPerPartFile || this.currentPartFileTimeStamp + this.millisPerPartFile < System.currentTimeMillis()) {
                if (this.partOutStr.getPos() > 0) {
                    return true;
                }
            }
            return false;
        } catch (IOException e) {
            return true;
        }
    }

    public boolean flushData() throws IOException {
        if (this.partOutStr == null) {
            return false;
        }
        this.partOutStr.hflush();
        if (!isReadyTurnoverPartFile()) {
            return false;
        }
        turnover();
        return true;
    }

    private void turnover() throws IOException {
        this.partOutStr.close();
        this.partOutStr = null;
        writeIndex();
        this.syncRequested = false;
    }

    private void writeIndex() {
        if (this.partFileBytes <= 0) {
            return;
        }
        try {
            String latestIndexLine = getLatestIndexLine();
            resetIndexExtraInfo();
            this.indexOutStr.write(latestIndexLine.getBytes());
            this.indexOutStr.hflush();
            this.indexOutStr.hsync();
        } catch (IOException e) {
            logger.error(e.toString());
        }
    }

    public String getLatestIndexLine() {
        String indexExtraInfo = getIndexExtraInfo();
        String str = "F:" + this.hdfsFile + ":" + this.currentPartFileTimeStamp + "-" + System.currentTimeMillis() + ":" + this.partFileItemCount;
        if (indexExtraInfo != null) {
            str = str + ":T:" + indexExtraInfo;
        }
        return str + "\n";
    }

    private void writeIndexEnd() {
        try {
            this.indexOutStr.write("E\n".getBytes());
            this.indexOutStr.hflush();
            this.indexOutStr.hsync();
        } catch (IOException e) {
            logger.error(e.toString());
        }
    }

    protected String getIndexExtraInfo() {
        return null;
    }

    protected void resetIndexExtraInfo() {
    }
}
