package org.apache.storm.hdfs.bolt;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.storm.Config;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.class */
public class AvroGenericRecordBoltTest {

    @Rule
    public MiniDFSClusterRule dfsClusterRule = new MiniDFSClusterRule(new MiniDFSClusterRule.Java7Supplier<Configuration>() { // from class: org.apache.storm.hdfs.bolt.AvroGenericRecordBoltTest.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.storm.hdfs.testing.MiniDFSClusterRule.Java7Supplier
        public Configuration get() {
            Configuration configuration = new Configuration();
            configuration.set("fs.trash.interval", "10");
            configuration.setBoolean("dfs.permissions", true);
            File absoluteFile = new File("./target/hdfs/").getAbsoluteFile();
            FileUtil.fullyDelete(absoluteFile);
            configuration.set("hdfs.minidfs.basedir", absoluteFile.getAbsolutePath());
            return configuration;
        }
    });

    @Mock
    private OutputCollector collector;

    @Mock
    private TopologyContext topologyContext;
    private DistributedFileSystem fs;
    private String hdfsURI;
    private static final String testRoot = "/unittest";
    private static Schema schema1;
    private static Schema schema2;
    private static Tuple tuple1;
    private static Tuple tuple2;
    private static final String schemaV1 = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"},{ \"name\":\"int1\", \"type\":\"int\" }]}";
    private static final String schemaV2 = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"},{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" },{ \"name\":\"int1\", \"type\":\"int\" }]}";

    @BeforeClass
    public static void setupClass() {
        schema1 = new Schema.Parser().parse(schemaV1);
        schema2 = new Schema.Parser().parse(schemaV2);
        GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(schema1);
        genericRecordBuilder.set("foo1", "bar1");
        genericRecordBuilder.set("int1", 1);
        tuple1 = generateTestTuple(genericRecordBuilder.build());
        GenericRecordBuilder genericRecordBuilder2 = new GenericRecordBuilder(schema2);
        genericRecordBuilder2.set("foo1", "bar2");
        genericRecordBuilder2.set("int1", 2);
        tuple2 = generateTestTuple(genericRecordBuilder2.build());
    }

    @Before
    public void setup() throws Exception {
        this.fs = this.dfsClusterRule.getDfscluster().getFileSystem();
        this.hdfsURI = this.fs.getUri() + "/";
    }

    @After
    public void shutDown() throws IOException {
        this.fs.close();
    }

    @Test
    public void multipleTuplesOneFile() throws IOException {
        AvroGenericRecordBolt makeAvroBolt = makeAvroBolt(this.hdfsURI, 1, 1.0f, schemaV1);
        makeAvroBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple1);
        Assert.assertEquals(1L, countNonZeroLengthFiles(testRoot));
        verifyAllAvroFiles(testRoot);
    }

    @Test
    public void multipleTuplesMutliplesFiles() throws IOException {
        AvroGenericRecordBolt makeAvroBolt = makeAvroBolt(this.hdfsURI, 1, 1.0E-4f, schemaV1);
        makeAvroBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple1);
        Assert.assertEquals(4L, countNonZeroLengthFiles(testRoot));
        verifyAllAvroFiles(testRoot);
    }

    @Test
    public void forwardSchemaChangeWorks() throws IOException {
        AvroGenericRecordBolt makeAvroBolt = makeAvroBolt(this.hdfsURI, 1, 1000.0f, schemaV1);
        makeAvroBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        Assert.assertEquals(2L, countNonZeroLengthFiles(testRoot));
        verifyAllAvroFiles(testRoot);
    }

    @Test
    public void backwardSchemaChangeWorks() throws IOException {
        AvroGenericRecordBolt makeAvroBolt = makeAvroBolt(this.hdfsURI, 1, 1000.0f, schemaV2);
        makeAvroBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        Assert.assertEquals(2L, countNonZeroLengthFiles(testRoot));
        verifyAllAvroFiles(testRoot);
    }

    @Test
    public void schemaThrashing() throws IOException {
        AvroGenericRecordBolt makeAvroBolt = makeAvroBolt(this.hdfsURI, 1, 1000.0f, schemaV2);
        makeAvroBolt.prepare(new Config(), this.topologyContext, this.collector);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        makeAvroBolt.execute(tuple1);
        makeAvroBolt.execute(tuple2);
        Assert.assertEquals(2L, countNonZeroLengthFiles(testRoot));
        verifyAllAvroFiles(testRoot);
    }

    private AvroGenericRecordBolt makeAvroBolt(String str, int i, float f, String str2) {
        CountSyncPolicy countSyncPolicy = new CountSyncPolicy(i);
        DefaultFileNameFormat withPath = new DefaultFileNameFormat().withPath(testRoot);
        return new AvroGenericRecordBolt().withFsUrl(str).withFileNameFormat(withPath).withRotationPolicy(new FileSizeRotationPolicy(f, FileSizeRotationPolicy.Units.MB)).withSyncPolicy(countSyncPolicy);
    }

    private static Tuple generateTestTuple(GenericRecord genericRecord) {
        return new TupleImpl(new GeneralTopologyContext(new TopologyBuilder().createTopology(), new Config(), new HashMap(), new HashMap(), new HashMap(), "") { // from class: org.apache.storm.hdfs.bolt.AvroGenericRecordBoltTest.2
            public Fields getComponentOutputFields(String str, String str2) {
                return new Fields(new String[]{"record"});
            }
        }, new Values(new Object[]{genericRecord}), 1, "");
    }

    private void verifyAllAvroFiles(String str) throws IOException {
        for (FileStatus fileStatus : this.fs.listStatus(new Path(str))) {
            if (fileStatus.getLen() > 0) {
                fileIsGoodAvro(fileStatus.getPath());
            }
        }
    }

    private int countNonZeroLengthFiles(String str) throws IOException {
        int i = 0;
        for (FileStatus fileStatus : this.fs.listStatus(new Path(str))) {
            if (fileStatus.getLen() > 0) {
                i++;
            }
        }
        return i;
    }

    /* JADX WARN: Finally extract failed */
    private void fileIsGoodAvro(Path path) throws IOException {
        GenericDatumReader genericDatumReader = new GenericDatumReader();
        FSDataInputStream open = this.fs.open(path, 0);
        Throwable th = null;
        try {
            FileOutputStream fileOutputStream = new FileOutputStream("target/FOO.avro");
            Throwable th2 = null;
            try {
                byte[] bArr = new byte[100];
                while (true) {
                    int read = open.read(bArr);
                    if (read <= 0) {
                        break;
                    } else {
                        fileOutputStream.write(bArr, 0, read);
                    }
                }
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                File file = new File("target/FOO.avro");
                DataFileReader dataFileReader = new DataFileReader(file, genericDatumReader);
                Throwable th4 = null;
                GenericRecord genericRecord = null;
                while (dataFileReader.hasNext()) {
                    try {
                        try {
                            genericRecord = (GenericRecord) dataFileReader.next(genericRecord);
                        } finally {
                        }
                    } catch (Throwable th5) {
                        if (dataFileReader != null) {
                            if (th4 != null) {
                                try {
                                    dataFileReader.close();
                                } catch (Throwable th6) {
                                    th4.addSuppressed(th6);
                                }
                            } else {
                                dataFileReader.close();
                            }
                        }
                        throw th5;
                    }
                }
                if (dataFileReader != null) {
                    if (0 != 0) {
                        try {
                            dataFileReader.close();
                        } catch (Throwable th7) {
                            th4.addSuppressed(th7);
                        }
                    } else {
                        dataFileReader.close();
                    }
                }
                file.delete();
            } catch (Throwable th8) {
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th9) {
                            th2.addSuppressed(th9);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                throw th8;
            }
        } finally {
            if (open != null) {
                if (0 != 0) {
                    try {
                        open.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    open.close();
                }
            }
        }
    }
}
