/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestFileCreation;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;

public class TestFileConcurrentReader {
    private static final org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(TestFileConcurrentReader.class);
    static final long seed = 3735928559L;
    static final int blockSize = 8192;
    private static final int DEFAULT_WRITE_SIZE = 1025;
    private static final int SMALL_WRITE_SIZE = 61;
    private Configuration conf;
    private MiniDFSCluster cluster;
    private FileSystem fileSystem;

    public TestFileConcurrentReader() {
        GenericTestUtils.setLogLevel((Log)LeaseManager.LOG, (Level)Level.ALL);
        GenericTestUtils.setLogLevel((Log)FSNamesystem.LOG, (Level)Level.ALL);
        GenericTestUtils.setLogLevel((Logger)DFSClient.LOG, (Level)Level.ALL);
    }

    @Before
    public void setUp() throws IOException {
        this.conf = new Configuration();
        this.init(this.conf);
    }

    @After
    public void tearDown() throws Exception {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void init(Configuration conf) throws IOException {
        if (this.cluster != null) {
            this.cluster.shutdown();
        }
        this.cluster = new MiniDFSCluster.Builder(conf).build();
        this.cluster.waitClusterUp();
        this.fileSystem = this.cluster.getFileSystem();
    }

    private void writeFileAndSync(FSDataOutputStream stm, int size) throws IOException {
        byte[] buffer = DFSTestUtil.generateSequentialBytes(0, size);
        stm.write(buffer, 0, size);
        stm.hflush();
    }

    private void checkCanRead(FileSystem fileSys, Path path, int numBytes) throws IOException {
        this.waitForBlocks(fileSys, path);
        this.assertBytesAvailable(fileSys, path, numBytes);
    }

    private void assertBytesAvailable(FileSystem fileSystem, Path path, int numBytes) throws IOException {
        byte[] buffer = new byte[numBytes];
        FSDataInputStream inputStream = fileSystem.open(path);
        IOUtils.readFully((InputStream)inputStream, (byte[])buffer, (int)0, (int)numBytes);
        inputStream.close();
        Assert.assertTrue((String)"unable to validate bytes", (boolean)this.validateSequentialBytes(buffer, 0, numBytes));
    }

    private void waitForBlocks(FileSystem fileSys, Path name) throws IOException {
        boolean done = false;
        while (!done) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            done = true;
            BlockLocation[] locations = fileSys.getFileBlockLocations(fileSys.getFileStatus(name), 0L, 8192L);
            if (locations.length >= 1) continue;
            done = false;
        }
    }

    @Test(timeout=30000L)
    public void testUnfinishedBlockRead() throws IOException {
        Path file1 = new Path("/unfinished-block");
        FSDataOutputStream stm = TestFileCreation.createFile(this.fileSystem, file1, 1);
        int partialBlockSize = 4096;
        this.writeFileAndSync(stm, partialBlockSize);
        this.checkCanRead(this.fileSystem, file1, partialBlockSize);
        stm.close();
    }

    @Test(timeout=30000L)
    public void testUnfinishedBlockPacketBufferOverrun() throws IOException {
        Path path = new Path("/");
        System.out.println("Path : \"" + path.toString() + "\"");
        Path file1 = new Path("/unfinished-block");
        FSDataOutputStream stm = TestFileCreation.createFile(this.fileSystem, file1, 1);
        int bytesPerChecksum = this.conf.getInt("io.bytes.per.checksum", 512);
        int partialBlockSize = bytesPerChecksum - 1;
        this.writeFileAndSync(stm, partialBlockSize);
        this.checkCanRead(this.fileSystem, file1, partialBlockSize);
        stm.close();
    }

    @Test(timeout=30000L)
    public void testImmediateReadOfNewFile() throws IOException {
        int blockSize = 65536;
        int writeSize = 655360;
        Configuration conf = new Configuration();
        conf.setLong("dfs.blocksize", 65536L);
        this.init(conf);
        int requiredSuccessfulOpens = 100;
        final Path file = new Path("/file1");
        final AtomicBoolean openerDone = new AtomicBoolean(false);
        final AtomicReference errorMessage = new AtomicReference();
        final FSDataOutputStream out = this.fileSystem.create(file);
        final Thread writer = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    while (!openerDone.get()) {
                        out.write(DFSTestUtil.generateSequentialBytes(0, 655360));
                        out.hflush();
                    }
                }
                catch (IOException e) {
                    LOG.warn((Object)"error in writer", (Throwable)e);
                }
                finally {
                    try {
                        out.close();
                    }
                    catch (IOException e) {
                        LOG.error((Object)"unable to close file");
                    }
                }
            }
        });
        Thread opener = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    for (int i = 0; i < 100; ++i) {
                        TestFileConcurrentReader.this.fileSystem.open(file).close();
                    }
                    openerDone.set(true);
                }
                catch (IOException e) {
                    openerDone.set(true);
                    errorMessage.set(String.format("got exception : %s", StringUtils.stringifyException((Throwable)e)));
                }
                catch (Exception e) {
                    openerDone.set(true);
                    errorMessage.set(String.format("got exception : %s", StringUtils.stringifyException((Throwable)e)));
                    writer.interrupt();
                    Assert.fail((String)"here");
                }
            }
        });
        writer.start();
        opener.start();
        try {
            writer.join();
            opener.join();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        Assert.assertNull((String)((String)errorMessage.get()), errorMessage.get());
    }

    @Test(timeout=30000L)
    public void testUnfinishedBlockCRCErrorTransferTo() throws IOException {
        this.runTestUnfinishedBlockCRCError(true, SyncType.SYNC, 1025);
    }

    @Test(timeout=30000L)
    public void testUnfinishedBlockCRCErrorTransferToVerySmallWrite() throws IOException {
        this.runTestUnfinishedBlockCRCError(true, SyncType.SYNC, 61);
    }

    @Ignore
    public void _testUnfinishedBlockCRCErrorTransferToAppend() throws IOException {
        this.runTestUnfinishedBlockCRCError(true, SyncType.APPEND, 1025);
    }

    @Test(timeout=30000L)
    public void testUnfinishedBlockCRCErrorNormalTransfer() throws IOException {
        this.runTestUnfinishedBlockCRCError(false, SyncType.SYNC, 1025);
    }

    @Test(timeout=30000L)
    public void testUnfinishedBlockCRCErrorNormalTransferVerySmallWrite() throws IOException {
        this.runTestUnfinishedBlockCRCError(false, SyncType.SYNC, 61);
    }

    @Ignore
    public void _testUnfinishedBlockCRCErrorNormalTransferAppend() throws IOException {
        this.runTestUnfinishedBlockCRCError(false, SyncType.APPEND, 1025);
    }

    private void runTestUnfinishedBlockCRCError(boolean transferToAllowed, SyncType syncType, int writeSize) throws IOException {
        this.runTestUnfinishedBlockCRCError(transferToAllowed, syncType, writeSize, new Configuration());
    }

    private void runTestUnfinishedBlockCRCError(boolean transferToAllowed, final SyncType syncType, final int writeSize, Configuration conf) throws IOException {
        conf.setBoolean("dfs.datanode.transferTo.allowed", transferToAllowed);
        this.init(conf);
        final Path file = new Path("/block-being-written-to");
        int numWrites = 2000;
        final AtomicBoolean writerDone = new AtomicBoolean(false);
        final AtomicBoolean writerStarted = new AtomicBoolean(false);
        final AtomicBoolean error = new AtomicBoolean(false);
        final Thread writer = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    FSDataOutputStream outputStream = TestFileConcurrentReader.this.fileSystem.create(file);
                    if (syncType == SyncType.APPEND) {
                        outputStream.close();
                        outputStream = TestFileConcurrentReader.this.fileSystem.append(file);
                    }
                    try {
                        for (int i = 0; !error.get() && i < 2000; ++i) {
                            byte[] writeBuf = DFSTestUtil.generateSequentialBytes(i * writeSize, writeSize);
                            outputStream.write(writeBuf);
                            if (syncType == SyncType.SYNC) {
                                outputStream.hflush();
                            }
                            writerStarted.set(true);
                        }
                    }
                    catch (IOException e) {
                        error.set(true);
                        LOG.error((Object)"error writing to file", (Throwable)e);
                    }
                    finally {
                        outputStream.close();
                    }
                    writerDone.set(true);
                }
                catch (Exception e) {
                    LOG.error((Object)"error in writer", (Throwable)e);
                    throw new RuntimeException(e);
                }
            }
        });
        Thread tailer = new Thread(new Runnable(){

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            @Override
            public void run() {
                try {
                    long startPos = 0L;
                    while (!writerDone.get() && !error.get()) {
                        if (!writerStarted.get()) continue;
                        try {
                            startPos = TestFileConcurrentReader.this.tailFile(file, startPos);
                        }
                        catch (IOException e) {
                            LOG.error((Object)String.format("error tailing file %s", file), (Throwable)e);
                            throw new RuntimeException(e);
                            return;
                        }
                    }
                }
                catch (RuntimeException e) {
                    if (e.getCause() instanceof ChecksumException) {
                        error.set(true);
                    }
                    writer.interrupt();
                    LOG.error((Object)"error in tailer", (Throwable)e);
                    throw e;
                }
            }
        });
        writer.start();
        tailer.start();
        try {
            writer.join();
            tailer.join();
            Assert.assertFalse((String)"error occurred, see log above", (boolean)error.get());
        }
        catch (InterruptedException e) {
            LOG.info((Object)"interrupted waiting for writer or tailer to complete");
            Thread.currentThread().interrupt();
        }
    }

    private boolean validateSequentialBytes(byte[] buf, int startPos, int len) {
        for (int i = 0; i < len; ++i) {
            int expected = (i + startPos) % 127;
            if (buf[i] % 127 == expected) continue;
            LOG.error((Object)String.format("at position [%d], got [%d] and expected [%d]", startPos, buf[i], expected));
            return false;
        }
        return true;
    }

    private long tailFile(Path file, long startPos) throws IOException {
        int read;
        long numRead = 0L;
        FSDataInputStream inputStream = this.fileSystem.open(file);
        inputStream.seek(startPos);
        int len = 4096;
        byte[] buf = new byte[len];
        while ((read = inputStream.read(buf)) > -1) {
            LOG.info((Object)String.format("read %d bytes", read));
            if (!this.validateSequentialBytes(buf, (int)(startPos + numRead), read)) {
                LOG.error((Object)String.format("invalid bytes: [%s]\n", Arrays.toString(buf)));
                throw new ChecksumException(String.format("unable to validate bytes", new Object[0]), startPos);
            }
            numRead += (long)read;
        }
        inputStream.close();
        return numRead + startPos - 1L;
    }

    private static enum SyncType {
        SYNC,
        APPEND;

    }
}

