package co.cask.cdap.data.stream.service.upload;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.data.stream.StreamDataFileConstants;
import co.cask.cdap.data.stream.StreamDataFileWriter;
import co.cask.cdap.data.stream.service.ConcurrentStreamWriter;
import co.cask.cdap.data.stream.service.MutableStreamEvent;
import co.cask.cdap.data.stream.service.MutableStreamEventData;
import co.cask.cdap.data2.transaction.stream.StreamConfig;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.twill.filesystem.Location;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/data/stream/service/upload/FileContentWriter.class */
public final class FileContentWriter implements ContentWriter {
    private final StreamConfig streamConfig;
    private final ConcurrentStreamWriter streamWriter;
    private final MutableStreamEventData streamEventData = new MutableStreamEventData();
    private final MutableStreamEvent streamEvent = new MutableStreamEvent();
    private final Location eventFile;
    private final Location indexFile;
    private final StreamDataFileWriter writer;
    private long eventCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FileContentWriter(StreamConfig streamConfig, ConcurrentStreamWriter concurrentStreamWriter, Location location, Map<String, String> map) throws IOException {
        this.streamConfig = streamConfig;
        this.streamWriter = concurrentStreamWriter;
        location.mkdirs();
        this.eventFile = location.append("upload.dat");
        this.indexFile = location.append("upload.idx");
        Map<String, String> createStreamFileProperties = createStreamFileProperties(map);
        createStreamFileProperties.put(StreamDataFileConstants.Property.Key.UNI_TIMESTAMP, StreamDataFileConstants.Property.Value.CLOSE_TIMESTAMP);
        this.writer = new StreamDataFileWriter(Locations.newOutputSupplier(this.eventFile), Locations.newOutputSupplier(this.indexFile), streamConfig.getIndexInterval(), createStreamFileProperties);
    }

    private Map<String, String> createStreamFileProperties(Map<String, String> map) {
        HashMap newHashMap = Maps.newHashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            newHashMap.put(StreamDataFileConstants.Property.Key.EVENT_HEADER_PREFIX + entry.getKey(), entry.getValue());
        }
        return newHashMap;
    }

    @Override // co.cask.cdap.data.stream.service.upload.ContentWriter
    public void append(ByteBuffer byteBuffer, boolean z) throws IOException {
        doAppend(byteBuffer, System.currentTimeMillis());
    }

    @Override // co.cask.cdap.data.stream.service.upload.ContentWriter
    public void appendAll(Iterator<ByteBuffer> it, boolean z) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        while (it.hasNext()) {
            doAppend(it.next(), currentTimeMillis);
        }
    }

    public void cancel() {
        Closeables.closeQuietly(this.writer);
        Locations.deleteQuietly(Locations.getParent(this.eventFile), true);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.writer.flush();
            this.streamWriter.appendFile(this.streamConfig.getStreamId(), this.eventFile, this.indexFile, this.eventCount, this.writer);
            Locations.deleteQuietly(Locations.getParent(this.eventFile), true);
        } catch (Throwable th) {
            Locations.deleteQuietly(Locations.getParent(this.eventFile), true);
            throw th;
        }
    }

    private void doAppend(ByteBuffer byteBuffer, long j) throws IOException {
        this.writer.append((StreamEvent) this.streamEvent.set(this.streamEventData.setBody(byteBuffer), j));
        this.eventCount++;
    }
}
