package com.datatorrent.lib.io.block;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.block.AbstractBlockReader;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.datatorrent.netlet.util.Slice;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/block/BlockWriter.class */
public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>> implements Partitioner<BlockWriter> {
    public static final String DEFAULT_BLOCKS_DIR = "blocks";
    private static final Logger LOG = LoggerFactory.getLogger(BlockWriter.class);
    private String blocksDirectory = DEFAULT_BLOCKS_DIR;
    public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>() { // from class: com.datatorrent.lib.io.block.BlockWriter.1
        public void process(BlockMetadata.FileBlockMetadata fileBlockMetadata) {
            BlockWriter.this.blockMetadatas.add(fileBlockMetadata);
            BlockWriter.LOG.debug("received blockId {} for file {} ", Long.valueOf(fileBlockMetadata.getBlockId()), fileBlockMetadata.getFilePath());
        }
    };
    public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blockMetadataOutput = new DefaultOutputPort<>();
    private transient List<BlockMetadata.FileBlockMetadata> blockMetadatas = Lists.newArrayList();

    public BlockWriter() {
        this.filePath = "";
    }

    @Override // com.datatorrent.lib.io.fs.AbstractFileOutputOperator
    public void setup(Context.OperatorContext operatorContext) {
        this.filePath = ((String) operatorContext.getValue(Context.DAGContext.APPLICATION_PATH)) + "/" + this.blocksDirectory;
        super.setup(operatorContext);
    }

    @Override // com.datatorrent.lib.io.fs.AbstractFileOutputOperator
    public void endWindow() {
        super.endWindow();
        this.streamsCache.asMap().clear();
        this.endOffsets.clear();
        for (BlockMetadata.FileBlockMetadata fileBlockMetadata : this.blockMetadatas) {
            try {
                finalizeFile(Long.toString(fileBlockMetadata.getBlockId()));
                this.blockMetadataOutput.emit(fileBlockMetadata);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        this.blockMetadatas.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.fs.AbstractFileOutputOperator
    public String getFileName(AbstractBlockReader.ReaderRecord<Slice> readerRecord) {
        return Long.toString(readerRecord.getBlockId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.fs.AbstractFileOutputOperator
    public byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> readerRecord) {
        return readerRecord.getRecord().buffer;
    }

    public Collection<Partitioner.Partition<BlockWriter>> definePartitions(Collection<Partitioner.Partition<BlockWriter>> collection, Partitioner.PartitioningContext partitioningContext) {
        if (partitioningContext.getParallelPartitionCount() == 0) {
            return collection;
        }
        if (partitioningContext.getParallelPartitionCount() == collection.size()) {
            LOG.debug("no change is partition count: " + collection.size());
            return collection;
        }
        ArrayList newArrayList = Lists.newArrayList();
        LOG.debug("block writer parallel partition count {}", Integer.valueOf(partitioningContext.getParallelPartitionCount()));
        int parallelPartitionCount = partitioningContext.getParallelPartitionCount() - collection.size();
        if (parallelPartitionCount >= 0) {
            BlockWriter blockWriter = (BlockWriter) collection.iterator().next().getPartitionedInstance();
            while (true) {
                int i = parallelPartitionCount;
                parallelPartitionCount--;
                if (i <= 0) {
                    break;
                }
                collection.add(new DefaultPartition(blockWriter));
            }
        } else {
            Iterator<Partitioner.Partition<BlockWriter>> it = collection.iterator();
            while (true) {
                int i2 = parallelPartitionCount;
                parallelPartitionCount++;
                if (i2 >= 0) {
                    break;
                }
                newArrayList.add(((BlockWriter) it.next().getPartitionedInstance()).fileCounters);
                it.remove();
            }
        }
        BlockWriter blockWriter2 = (BlockWriter) collection.iterator().next().getPartitionedInstance();
        Iterator it2 = newArrayList.iterator();
        while (it2.hasNext()) {
            addCounters(blockWriter2.fileCounters, (BasicCounters) it2.next());
        }
        LOG.debug("Block writers {}", Integer.valueOf(collection.size()));
        return collection;
    }

    protected void addCounters(BasicCounters<MutableLong> basicCounters, BasicCounters<MutableLong> basicCounters2) {
        for (AbstractFileOutputOperator.Counters counters : AbstractFileOutputOperator.Counters.values()) {
            MutableLong counter = basicCounters.getCounter(counters);
            if (counter == null) {
                counter = new MutableLong();
                basicCounters.setCounter(counters, counter);
            }
            MutableLong counter2 = basicCounters2.getCounter(counters);
            if (counter2 != null) {
                counter.add(counter2.longValue());
            }
        }
    }

    public String getBlocksDirectory() {
        return this.blocksDirectory;
    }

    public void setBlocksDirectory(String str) {
        this.blocksDirectory = str;
    }

    public void partitioned(Map<Integer, Partitioner.Partition<BlockWriter>> map) {
    }
}
