package org.apache.storm.hdfs.bolt;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.storm.Config;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.common.Partitioner;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.MockTupleHelpers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/storm/hdfs/bolt/TestHdfsBolt.class */
public class TestHdfsBolt {
    private String hdfsURI;
    private DistributedFileSystem fs;
    private MiniDFSCluster hdfsCluster;
    private static final String testRoot = "/unittest";

    @Mock
    private OutputCollector collector;

    @Mock
    private TopologyContext topologyContext;
    Tuple tuple1 = generateTestTuple(1, "First Tuple", "SFO", "CA");
    Tuple tuple2 = generateTestTuple(1, "Second Tuple", "SJO", "CA");

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setup() throws Exception {
        MockitoAnnotations.initMocks(this);
        Configuration configuration = new Configuration();
        configuration.set("fs.trash.interval", "10");
        configuration.setBoolean("dfs.permissions", true);
        File absoluteFile = new File("./target/hdfs/").getAbsoluteFile();
        FileUtil.fullyDelete(absoluteFile);
        configuration.set("hdfs.minidfs.basedir", absoluteFile.getAbsolutePath());
        this.hdfsCluster = new MiniDFSCluster.Builder(configuration).build();
        this.fs = this.hdfsCluster.getFileSystem();
        this.hdfsURI = "hdfs://localhost:" + this.hdfsCluster.getNameNodePort() + "/";
    }

    @After
    public void shutDown() throws IOException {
        this.fs.close();
        this.hdfsCluster.shutdown();
    }

    @Test
    public void testTwoTuplesTwoFiles() throws IOException {
        HdfsBolt makeHdfsBolt = makeHdfsBolt(this.hdfsURI, 1, 1.0E-5f);
        makeHdfsBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeHdfsBolt.execute(this.tuple1);
        makeHdfsBolt.execute(this.tuple2);
        ((OutputCollector) Mockito.verify(this.collector)).ack(this.tuple1);
        ((OutputCollector) Mockito.verify(this.collector)).ack(this.tuple2);
        Assert.assertEquals(2L, countNonZeroLengthFiles(testRoot));
    }

    @Test
    public void testPartitionedOutput() throws IOException {
        HdfsBolt makeHdfsBolt = makeHdfsBolt(this.hdfsURI, 1, 1000.0f);
        Partitioner partitioner = new Partitioner() { // from class: org.apache.storm.hdfs.bolt.TestHdfsBolt.1
            public String getPartitionPath(Tuple tuple) {
                return "/" + tuple.getStringByField("city");
            }
        };
        makeHdfsBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeHdfsBolt.withPartitioner(partitioner);
        makeHdfsBolt.execute(this.tuple1);
        makeHdfsBolt.execute(this.tuple2);
        ((OutputCollector) Mockito.verify(this.collector)).ack(this.tuple1);
        ((OutputCollector) Mockito.verify(this.collector)).ack(this.tuple2);
        Assert.assertEquals(1L, countNonZeroLengthFiles("/unittest/SFO"));
        Assert.assertEquals(1L, countNonZeroLengthFiles("/unittest/SJO"));
    }

    @Test
    public void testTwoTuplesOneFile() throws IOException {
        HdfsBolt makeHdfsBolt = makeHdfsBolt(this.hdfsURI, 2, 10000.0f);
        makeHdfsBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeHdfsBolt.execute(this.tuple1);
        Mockito.verifyZeroInteractions(new Object[]{this.collector});
        makeHdfsBolt.execute(this.tuple2);
        ((OutputCollector) Mockito.verify(this.collector)).ack(this.tuple1);
        ((OutputCollector) Mockito.verify(this.collector)).ack(this.tuple2);
        Assert.assertEquals(1L, countNonZeroLengthFiles(testRoot));
    }

    @Test
    public void testFailedSync() throws IOException {
        HdfsBolt makeHdfsBolt = makeHdfsBolt(this.hdfsURI, 2, 10000.0f);
        makeHdfsBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeHdfsBolt.execute(this.tuple1);
        this.fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
        this.thrown.expect(RuntimeException.class);
        makeHdfsBolt.execute(this.tuple1);
    }

    @Test
    public void testFailureFilecount() throws IOException, InterruptedException {
        HdfsBolt makeHdfsBolt = makeHdfsBolt(this.hdfsURI, 1, 1.0E-6f);
        makeHdfsBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeHdfsBolt.execute(this.tuple1);
        this.fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
        try {
            makeHdfsBolt.execute(this.tuple2);
        } catch (RuntimeException e) {
        }
        try {
            makeHdfsBolt.execute(this.tuple2);
        } catch (RuntimeException e2) {
        }
        try {
            makeHdfsBolt.execute(this.tuple2);
        } catch (RuntimeException e3) {
        }
        Assert.assertEquals(1L, countNonZeroLengthFiles(testRoot));
        Assert.assertEquals(0L, countZeroLengthFiles(testRoot));
    }

    @Test
    public void testTickTuples() throws IOException {
        HdfsBolt makeHdfsBolt = makeHdfsBolt(this.hdfsURI, 10, 10000.0f);
        makeHdfsBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeHdfsBolt.execute(this.tuple1);
        Assert.assertEquals(0L, countNonZeroLengthFiles(testRoot));
        makeHdfsBolt.execute(MockTupleHelpers.mockTickTuple());
        Assert.assertEquals(1L, countNonZeroLengthFiles(testRoot));
    }

    public void createBaseDirectory(FileSystem fileSystem, String str) throws IOException {
        fileSystem.mkdirs(new Path(str));
    }

    private HdfsBolt makeHdfsBolt(String str, int i, float f) {
        DelimitedRecordFormat withFieldDelimiter = new DelimitedRecordFormat().withFieldDelimiter("|");
        return new HdfsBolt().withFsUrl(str).withFileNameFormat(new DefaultFileNameFormat().withPath(testRoot)).withRecordFormat(withFieldDelimiter).withRotationPolicy(new FileSizeRotationPolicy(f, FileSizeRotationPolicy.Units.MB)).withSyncPolicy(new CountSyncPolicy(i));
    }

    private Tuple generateTestTuple(Object obj, Object obj2, Object obj3, Object obj4) {
        return new TupleImpl(new GeneralTopologyContext(new TopologyBuilder().createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { // from class: org.apache.storm.hdfs.bolt.TestHdfsBolt.2
            public Fields getComponentOutputFields(String str, String str2) {
                return new Fields(new String[]{"id", "msg", "city", "state"});
            }
        }, new Values(new Object[]{obj, obj2, obj3, obj4}), 1, "");
    }

    private void printFiles(String str) throws IOException {
        for (FileStatus fileStatus : this.fs.listStatus(new Path(str))) {
            System.out.println("@@@ " + fileStatus.getPath() + " [" + fileStatus.getLen() + "]");
        }
    }

    private int countNonZeroLengthFiles(String str) throws IOException {
        int i = 0;
        for (FileStatus fileStatus : this.fs.listStatus(new Path(str))) {
            if (fileStatus.getLen() > 0) {
                i++;
            }
        }
        return i;
    }

    private int countZeroLengthFiles(String str) throws IOException {
        int i = 0;
        for (FileStatus fileStatus : this.fs.listStatus(new Path(str))) {
            if (fileStatus.getLen() == 0) {
                i++;
            }
        }
        return i;
    }
}
