package org.apache.storm.hdfs.bolt;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.storm.Config;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.class */
public class AvroGenericRecordBoltTest {
    private String hdfsURI;
    private DistributedFileSystem fs;
    private MiniDFSCluster hdfsCluster;
    private static final String testRoot = "/unittest";
    private static final Tuple tuple1;
    private static final Tuple tuple2;

    @Mock
    private OutputCollector collector;

    @Mock
    private TopologyContext topologyContext;
    private static final String userSchema = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"},{ \"name\":\"int1\", \"type\":\"int\" }]}";
    private static final Schema schema = new Schema.Parser().parse(userSchema);

    @Before
    public void setup() throws Exception {
        MockitoAnnotations.initMocks(this);
        Configuration configuration = new Configuration();
        configuration.set("fs.trash.interval", "10");
        configuration.setBoolean("dfs.permissions", true);
        File absoluteFile = new File("./target/hdfs/").getAbsoluteFile();
        FileUtil.fullyDelete(absoluteFile);
        configuration.set("hdfs.minidfs.basedir", absoluteFile.getAbsolutePath());
        this.hdfsCluster = new MiniDFSCluster.Builder(configuration).build();
        this.fs = this.hdfsCluster.getFileSystem();
        this.hdfsURI = this.fs.getUri() + "/";
    }

    @After
    public void shutDown() throws IOException {
        this.fs.close();
        this.hdfsCluster.shutdown();
    }

    @Test
    public void multipleTuplesOneFile() throws IOException {
        AvroGenericRecordBolt makeAvroBolt = makeAvroBolt(this.hdfsURI, 1, 1.0f, userSchema);
        makeAvroBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        Assert.assertEquals(1L, countNonZeroLengthFiles(testRoot));
        verifyAllAvroFiles(testRoot, schema);
    }

    @Test
    public void multipleTuplesMutliplesFiles() throws IOException {
        AvroGenericRecordBolt makeAvroBolt = makeAvroBolt(this.hdfsURI, 1, 1.0E-4f, userSchema);
        makeAvroBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        Assert.assertEquals(4L, countNonZeroLengthFiles(testRoot));
        verifyAllAvroFiles(testRoot, schema);
    }

    private AvroGenericRecordBolt makeAvroBolt(String str, int i, float f, String str2) {
        CountSyncPolicy countSyncPolicy = new CountSyncPolicy(i);
        DefaultFileNameFormat withPath = new DefaultFileNameFormat().withPath(testRoot);
        return new AvroGenericRecordBolt().withFsUrl(str).withFileNameFormat(withPath).withSchemaAsString(str2).withRotationPolicy(new FileSizeRotationPolicy(f, FileSizeRotationPolicy.Units.MB)).withSyncPolicy(countSyncPolicy);
    }

    private static Tuple generateTestTuple(GenericRecord genericRecord) {
        return new TupleImpl(new GeneralTopologyContext(new TopologyBuilder().createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { // from class: org.apache.storm.hdfs.bolt.AvroGenericRecordBoltTest.1
            public Fields getComponentOutputFields(String str, String str2) {
                return new Fields(new String[]{"record"});
            }
        }, new Values(new Object[]{genericRecord}), 1, "");
    }

    private void verifyAllAvroFiles(String str, Schema schema2) throws IOException {
        for (FileStatus fileStatus : this.fs.listStatus(new Path(str))) {
            if (fileStatus.getLen() > 0) {
                fileIsGoodAvro(fileStatus.getPath(), schema2);
            }
        }
    }

    private int countNonZeroLengthFiles(String str) throws IOException {
        int i = 0;
        for (FileStatus fileStatus : this.fs.listStatus(new Path(str))) {
            if (fileStatus.getLen() > 0) {
                i++;
            }
        }
        return i;
    }

    private void fileIsGoodAvro(Path path, Schema schema2) throws IOException {
        GenericDatumReader genericDatumReader = new GenericDatumReader(schema2);
        FSDataInputStream open = this.fs.open(path, 0);
        FileOutputStream fileOutputStream = new FileOutputStream("target/FOO.avro");
        byte[] bArr = new byte[100];
        while (true) {
            int read = open.read(bArr);
            if (read <= 0) {
                break;
            } else {
                fileOutputStream.write(bArr, 0, read);
            }
        }
        fileOutputStream.close();
        File file = new File("target/FOO.avro");
        DataFileReader dataFileReader = new DataFileReader(file, genericDatumReader);
        GenericRecord genericRecord = null;
        while (dataFileReader.hasNext()) {
            genericRecord = (GenericRecord) dataFileReader.next(genericRecord);
            System.out.println(genericRecord);
        }
        file.delete();
    }

    static {
        GenericData.Record record = new GenericData.Record(schema);
        record.put("foo1", "bar1");
        record.put("int1", 1);
        tuple1 = generateTestTuple(record);
        GenericData.Record record2 = new GenericData.Record(schema);
        record2.put("foo1", "bar2");
        record2.put("int1", 2);
        tuple2 = generateTestTuple(record2);
    }
}
