package org.apache.hama.bsp.message;

import java.io.EOFException;
import java.io.File;
import java.io.RandomAccessFile;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.SecureRandom;
import java.util.Iterator;
import junit.framework.TestCase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
import org.apache.hama.bsp.message.io.DirectByteBufferInputStream;
import org.apache.hama.bsp.message.io.DirectByteBufferOutputStream;
import org.apache.hama.bsp.message.io.ReusableByteBuffer;
import org.apache.hama.bsp.message.io.SpilledByteBuffer;
import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
import org.apache.hama.bsp.message.io.SyncFlushByteBufferOutputStream;
import org.apache.hama.bsp.message.io.SyncReadByteBufferInputStream;
import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;

/* loaded from: input_file:org/apache/hama/bsp/message/TestMessageIO.class */
public class TestMessageIO extends TestCase {

    /* loaded from: input_file:org/apache/hama/bsp/message/TestMessageIO$SumCombiner.class */
    public static class SumCombiner extends Combiner<IntWritable> {
        public IntWritable combine(Iterable<IntWritable> iterable) {
            int i = 0;
            Iterator<IntWritable> it = iterable.iterator();
            while (it.hasNext()) {
                i += it.next().get();
            }
            return new IntWritable(i);
        }

        /* renamed from: combine, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Writable m14combine(Iterable iterable) {
            return combine((Iterable<IntWritable>) iterable);
        }
    }

    public void testNonSpillBuffer() throws Exception {
        SpillingDataOutputBuffer spillingDataOutputBuffer = new SpillingDataOutputBuffer();
        Text text = new Text("Testing the spillage of spilling buffer");
        for (int i = 0; i < 100; i++) {
            text.write(spillingDataOutputBuffer);
        }
        assertTrue(spillingDataOutputBuffer != null);
        assertTrue(spillingDataOutputBuffer.size() == 4000);
        assertFalse(spillingDataOutputBuffer.hasSpilled());
        spillingDataOutputBuffer.close();
    }

    public void testSpillBuffer() throws Exception {
        HamaConfiguration hamaConfiguration = new HamaConfiguration();
        String str = System.getProperty("java.io.tmpdir") + File.separatorChar + new BigInteger(128, new SecureRandom()).toString(32);
        WriteSpilledDataProcessor writeSpilledDataProcessor = new WriteSpilledDataProcessor(str);
        writeSpilledDataProcessor.init(hamaConfiguration);
        SpillingDataOutputBuffer spillingDataOutputBuffer = new SpillingDataOutputBuffer(2, 1024, 1024, true, writeSpilledDataProcessor);
        Text text = new Text("Testing the spillage of spilling buffer");
        for (int i = 0; i < 100; i++) {
            text.write(spillingDataOutputBuffer);
        }
        assertTrue(spillingDataOutputBuffer != null);
        assertTrue(spillingDataOutputBuffer.size() == 4000);
        assertTrue(spillingDataOutputBuffer.hasSpilled());
        File file = new File(str);
        assertTrue(file.exists());
        assertTrue(file.delete());
        spillingDataOutputBuffer.close();
    }

    public void testSpillingByteBuffer() throws Exception {
        SpilledByteBuffer spilledByteBuffer = new SpilledByteBuffer(ByteBuffer.allocateDirect(512));
        for (int i = 0; i < 100; i++) {
            spilledByteBuffer.putInt(i);
            spilledByteBuffer.markEndOfRecord();
        }
        spilledByteBuffer.putInt(100);
        assertEquals(spilledByteBuffer.getMarkofLastRecord(), 400);
        assertEquals(spilledByteBuffer.remaining(), 108);
        spilledByteBuffer.flip();
        assertEquals(spilledByteBuffer.remaining(), 404);
        assertEquals(spilledByteBuffer.getMarkofLastRecord(), 400);
    }

    public void testDirectByteBufferOutput() throws Exception {
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(512);
        DirectByteBufferOutputStream directByteBufferOutputStream = new DirectByteBufferOutputStream();
        directByteBufferOutputStream.setBuffer(allocateDirect);
        IntWritable intWritable = new IntWritable(1);
        for (int i = 0; i < 100; i++) {
            intWritable.set(i);
            intWritable.write(directByteBufferOutputStream);
        }
        directByteBufferOutputStream.close();
        allocateDirect.flip();
        for (int i2 = 0; i2 < 100; i2++) {
            assertEquals(i2, allocateDirect.getInt());
        }
        try {
            allocateDirect.getInt();
            assertTrue(false);
        } catch (Exception e) {
            assertTrue(true);
        }
    }

    public void testDirectByteBufferInput() throws Exception {
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(512);
        DirectByteBufferOutputStream directByteBufferOutputStream = new DirectByteBufferOutputStream();
        directByteBufferOutputStream.setBuffer(allocateDirect);
        IntWritable intWritable = new IntWritable(1);
        for (int i = 0; i < 100; i++) {
            intWritable.set(i);
            intWritable.write(directByteBufferOutputStream);
        }
        intWritable.write(directByteBufferOutputStream);
        directByteBufferOutputStream.close();
        allocateDirect.flip();
        DirectByteBufferInputStream directByteBufferInputStream = new DirectByteBufferInputStream();
        directByteBufferInputStream.setBuffer(new SpilledByteBuffer(allocateDirect, 400));
        for (int i2 = 0; i2 < 100; i2++) {
            intWritable.readFields(directByteBufferInputStream);
            assertEquals(i2, intWritable.get());
        }
        assertFalse(directByteBufferInputStream.hasDataToRead());
        assertTrue(directByteBufferInputStream.hasUnmarkData());
        directByteBufferInputStream.prepareForNext();
        allocateDirect.clear();
        DirectByteBufferOutputStream directByteBufferOutputStream2 = new DirectByteBufferOutputStream();
        ByteBuffer allocateDirect2 = ByteBuffer.allocateDirect(2048);
        directByteBufferOutputStream2.setBuffer(allocateDirect2);
        for (int i3 = 0; i3 < 400; i3++) {
            intWritable.set(i3);
            intWritable.write(directByteBufferOutputStream2);
        }
        directByteBufferOutputStream2.close();
        allocateDirect2.flip();
        directByteBufferInputStream.setBuffer(new SpilledByteBuffer(allocateDirect2, 400));
        intWritable.readFields(directByteBufferInputStream);
        assertEquals(99, intWritable.get());
        for (int i4 = 0; i4 < 100; i4++) {
            intWritable.readFields(directByteBufferInputStream);
            assertEquals(i4, intWritable.get());
        }
        assertFalse(directByteBufferInputStream.hasDataToRead());
        assertTrue(directByteBufferInputStream.hasUnmarkData());
        directByteBufferInputStream.prepareForNext();
        allocateDirect2.clear();
        DirectByteBufferOutputStream directByteBufferOutputStream3 = new DirectByteBufferOutputStream();
        directByteBufferOutputStream3.setBuffer(allocateDirect2);
        for (int i5 = 0; i5 < 100; i5++) {
            intWritable.set(i5);
            intWritable.write(directByteBufferOutputStream3);
        }
        directByteBufferOutputStream3.close();
        allocateDirect2.flip();
        directByteBufferInputStream.setBuffer(new SpilledByteBuffer(allocateDirect2, 400));
        for (int i6 = 100; i6 < 400; i6++) {
            intWritable.readFields(directByteBufferInputStream);
            assertEquals(i6, intWritable.get());
        }
        for (int i7 = 0; i7 < 100; i7++) {
            intWritable.readFields(directByteBufferInputStream);
            assertEquals(i7, intWritable.get());
        }
        assertFalse(directByteBufferInputStream.hasDataToRead());
        assertFalse(directByteBufferInputStream.hasUnmarkData());
    }

    public void testReusableByteBufferIter() throws Exception {
        ReusableByteBuffer reusableByteBuffer = new ReusableByteBuffer(new IntWritable());
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(512);
        DirectByteBufferOutputStream directByteBufferOutputStream = new DirectByteBufferOutputStream();
        directByteBufferOutputStream.setBuffer(allocateDirect);
        IntWritable intWritable = new IntWritable(1);
        for (int i = 0; i < 100; i++) {
            intWritable.set(i);
            intWritable.write(directByteBufferOutputStream);
        }
        intWritable.write(directByteBufferOutputStream);
        directByteBufferOutputStream.close();
        allocateDirect.flip();
        reusableByteBuffer.set(new SpilledByteBuffer(allocateDirect, 400));
        Iterator it = reusableByteBuffer.iterator();
        int i2 = 0;
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            assertEquals(((IntWritable) it.next()).get(), i3);
        }
        assertEquals(i2, 100);
        reusableByteBuffer.prepareForNext();
        allocateDirect.clear();
        DirectByteBufferOutputStream directByteBufferOutputStream2 = new DirectByteBufferOutputStream();
        directByteBufferOutputStream2.setBuffer(allocateDirect);
        for (int i4 = 0; i4 < 101; i4++) {
            intWritable.set(i4);
            intWritable.write(directByteBufferOutputStream2);
        }
        directByteBufferOutputStream2.close();
        allocateDirect.flip();
        reusableByteBuffer.set(new SpilledByteBuffer(allocateDirect, 404));
        Iterator it2 = reusableByteBuffer.iterator();
        assertEquals(((IntWritable) it2.next()).get(), 99);
        int i5 = 0;
        while (it2.hasNext()) {
            int i6 = i5;
            i5++;
            assertEquals(((IntWritable) it2.next()).get(), i6);
        }
        allocateDirect.clear();
    }

    public void testCombineProcessor() throws Exception {
        String str = System.getProperty("java.io.tmpdir") + File.separatorChar + new BigInteger(128, new SecureRandom()).toString(32);
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect(512);
        DirectByteBufferOutputStream directByteBufferOutputStream = new DirectByteBufferOutputStream();
        directByteBufferOutputStream.setBuffer(allocateDirect);
        IntWritable intWritable = new IntWritable(1);
        int i = 0;
        for (int i2 = 0; i2 < 100; i2++) {
            intWritable.set(i2);
            intWritable.write(directByteBufferOutputStream);
            i += i2;
        }
        intWritable.write(directByteBufferOutputStream);
        directByteBufferOutputStream.close();
        allocateDirect.flip();
        HamaConfiguration hamaConfiguration = new HamaConfiguration();
        hamaConfiguration.setClass("bsp.message.type.class", IntWritable.class, Writable.class);
        hamaConfiguration.setClass("bsp.combiner.class", SumCombiner.class, Combiner.class);
        CombineSpilledDataProcessor combineSpilledDataProcessor = new CombineSpilledDataProcessor(str);
        assertTrue(combineSpilledDataProcessor.init(hamaConfiguration));
        File file = new File(str);
        try {
            assertTrue(combineSpilledDataProcessor.handleSpilledBuffer(new SpilledByteBuffer(allocateDirect, 400)));
            allocateDirect.flip();
            assertTrue(combineSpilledDataProcessor.handleSpilledBuffer(new SpilledByteBuffer(allocateDirect, 400)));
            assertTrue(combineSpilledDataProcessor.close());
            assertTrue(file.exists());
            assertEquals(file.length(), 8L);
            RandomAccessFile randomAccessFile = new RandomAccessFile(str, "r");
            FileChannel channel = randomAccessFile.getChannel();
            ByteBuffer allocateDirect2 = ByteBuffer.allocateDirect(16);
            channel.read(allocateDirect2);
            allocateDirect2.flip();
            assertEquals(allocateDirect2.getInt(), i);
            assertEquals(allocateDirect2.getInt(), i + 99);
            randomAccessFile.close();
            assertTrue(file.delete());
        } catch (Throwable th) {
            assertTrue(file.delete());
            throw th;
        }
    }

    public void testSpillInputStream() throws Exception {
        File file = null;
        try {
            String str = System.getProperty("java.io.tmpdir") + File.separatorChar + "testSpillInputStream.txt";
            HamaConfiguration hamaConfiguration = new HamaConfiguration();
            WriteSpilledDataProcessor writeSpilledDataProcessor = new WriteSpilledDataProcessor(str);
            writeSpilledDataProcessor.init(hamaConfiguration);
            SpillingDataOutputBuffer spillingDataOutputBuffer = new SpillingDataOutputBuffer(2, 1024, 1024, true, writeSpilledDataProcessor);
            Text text = new Text("Testing the spillage of spilling buffer");
            for (int i = 0; i < 100; i++) {
                text.write(spillingDataOutputBuffer);
                spillingDataOutputBuffer.markRecordEnd();
            }
            assertTrue(spillingDataOutputBuffer != null);
            assertTrue(spillingDataOutputBuffer.size() == 4000);
            assertTrue(spillingDataOutputBuffer.hasSpilled());
            file = new File(str);
            assertTrue(file.exists());
            spillingDataOutputBuffer.close();
            assertTrue(file.length() == 4000);
            SpilledDataInputBuffer inputStreamToRead = spillingDataOutputBuffer.getInputStreamToRead(str);
            for (int i2 = 0; i2 < 100; i2++) {
                text.readFields(inputStreamToRead);
                assertTrue("Testing the spillage of spilling buffer".equals(text.toString()));
                text.clear();
            }
            try {
                text.readFields(inputStreamToRead);
                assertTrue(false);
            } catch (EOFException e) {
                assertTrue(true);
            }
            inputStreamToRead.close();
            inputStreamToRead.completeReading(false);
            assertTrue(file.exists());
            inputStreamToRead.completeReading(true);
            assertFalse(file.exists());
            if (file == null || !file.exists()) {
                return;
            }
            file.delete();
        } catch (Throwable th) {
            if (file != null && file.exists()) {
                file.delete();
            }
            throw th;
        }
    }

    public void testSyncFlushByteBufferOutputStream() throws Exception {
        File file = null;
        try {
            String str = System.getProperty("java.io.tmpdir") + File.separatorChar + "testSyncFlushByteBufferOutputStream.txt";
            DirectByteBufferOutputStream directByteBufferOutputStream = new DirectByteBufferOutputStream(new SyncFlushByteBufferOutputStream(str));
            directByteBufferOutputStream.setBuffer(ByteBuffer.allocateDirect(512));
            IntWritable intWritable = new IntWritable(1);
            for (int i = 0; i < 200; i++) {
                intWritable.set(i);
                intWritable.write(directByteBufferOutputStream);
            }
            intWritable.write(directByteBufferOutputStream);
            directByteBufferOutputStream.close();
            file = new File(str);
            assertTrue(file.exists());
            assertTrue(file.length() == 804);
            assertTrue(file.delete());
            if (file != null) {
                file.delete();
            }
        } catch (Throwable th) {
            if (file != null) {
                file.delete();
            }
            throw th;
        }
    }

    public void testSyncFlushBufferInputStream() throws Exception {
        File file = null;
        try {
            String str = System.getProperty("java.io.tmpdir") + File.separatorChar + "testSyncFlushBufferInputStream.txt";
            SyncFlushByteBufferOutputStream syncFlushByteBufferOutputStream = new SyncFlushByteBufferOutputStream(str);
            DirectByteBufferOutputStream directByteBufferOutputStream = new DirectByteBufferOutputStream(syncFlushByteBufferOutputStream);
            ByteBuffer allocateDirect = ByteBuffer.allocateDirect(512);
            directByteBufferOutputStream.setBuffer(allocateDirect);
            IntWritable intWritable = new IntWritable(1);
            for (int i = 0; i < 200; i++) {
                intWritable.set(i);
                intWritable.write(directByteBufferOutputStream);
            }
            intWritable.write(directByteBufferOutputStream);
            directByteBufferOutputStream.close();
            file = new File(str);
            assertTrue(file.exists());
            assertEquals(file.length(), 804L);
            DirectByteBufferInputStream directByteBufferInputStream = new DirectByteBufferInputStream(new SyncReadByteBufferInputStream(syncFlushByteBufferOutputStream.isSpilled(), str));
            allocateDirect.clear();
            directByteBufferInputStream.setBuffer(allocateDirect);
            for (int i2 = 0; i2 < 200; i2++) {
                intWritable.readFields(directByteBufferInputStream);
                assertEquals(intWritable.get(), i2);
            }
            intWritable.readFields(directByteBufferInputStream);
            assertEquals(intWritable.get(), 199);
            try {
                intWritable.readFields(directByteBufferInputStream);
                assertFalse(true);
            } catch (Exception e) {
                assertTrue(true);
            }
            directByteBufferInputStream.close();
            directByteBufferOutputStream.close();
            if (file != null) {
                file.delete();
            }
        } catch (Throwable th) {
            if (file != null) {
                file.delete();
            }
            throw th;
        }
    }
}
