/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.sync.pipedata;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.pipedata.queue.BufferedPipeDataQueue;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BufferedPipeDataQueueTest {
    File pipeLogDir = new File(SyncPathUtil.getReceiverPipeLogDir((String)"pipe", (String)"192.168.0.11", (long)System.currentTimeMillis()));

    @Before
    public void setUp() throws Exception {
        EnvironmentUtils.envSetUp();
        if (!this.pipeLogDir.exists()) {
            this.pipeLogDir.mkdirs();
        }
    }

    @After
    public void tearDown() throws IOException, StorageEngineException {
        FileUtils.deleteDirectory((File)this.pipeLogDir);
        EnvironmentUtils.cleanEnv();
    }

    @Test
    public void testRecoveryAndClear() {
        try {
            DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            outputStream.writeLong(1L);
            outputStream.close();
            DataOutputStream pipeLogOutput1 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)0L)), false));
            for (int i = 0; i < 4; ++i) {
                new TsFilePipeData("", (long)i).serialize(pipeLogOutput1);
            }
            pipeLogOutput1.close();
            DataOutputStream pipeLogOutput2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)4L)), false));
            for (int i = 4; i < 11; ++i) {
                new TsFilePipeData("", (long)i).serialize(pipeLogOutput2);
            }
            pipeLogOutput2.close();
            DataOutputStream pipeLogOutput3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)11L)), false));
            pipeLogOutput3.close();
            BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            Assert.assertEquals((long)1L, (long)pipeDataQueue.getCommitSerialNumber());
            Assert.assertEquals((long)10L, (long)pipeDataQueue.getLastMaxSerialNumber());
            pipeDataQueue.clear();
            Assert.assertFalse((boolean)this.pipeLogDir.exists());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTake() {
        BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
        ArrayList pipeDatas = new ArrayList();
        ExecutorService es1 = Executors.newSingleThreadExecutor();
        es1.execute(() -> {
            try {
                pipeDatas.add(pipeDataQueue.take());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        try {
            Thread.sleep(3000L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        es1.shutdownNow();
        Assert.assertEquals((long)0L, (long)pipeDatas.size());
    }

    @Test
    public void testTakeAndOffer() {
        BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
        ArrayList pipeDatas = new ArrayList();
        ExecutorService es1 = Executors.newSingleThreadExecutor();
        es1.execute(() -> {
            try {
                pipeDatas.add(pipeDataQueue.take());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        pipeDataQueue.offer((PipeData)new TsFilePipeData("", 0L));
        try {
            Thread.sleep(3000L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        es1.shutdownNow();
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            Assert.fail();
        }
        Assert.assertEquals((long)1L, (long)pipeDatas.size());
        pipeDataQueue.clear();
    }

    @Test
    public void testOfferNewPipe() {
        BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
        TsFilePipeData pipeData = new TsFilePipeData("fakePath", 1L);
        pipeDataQueue.offer((PipeData)pipeData);
        ArrayList pipeDatas = new ArrayList();
        ExecutorService es1 = Executors.newSingleThreadExecutor();
        es1.execute(() -> {
            try {
                pipeDatas.add(pipeDataQueue.take());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        try {
            Thread.sleep(3000L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        es1.shutdownNow();
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            Assert.fail();
        }
        Assert.assertEquals((long)1L, (long)pipeDatas.size());
        Assert.assertEquals((Object)pipeData, pipeDatas.get(0));
        pipeDataQueue.clear();
    }

    @Test
    public void testOfferAfterRecoveryWithEmptyPipeLog() {
        try {
            DeletionPipeData pipeData;
            int i;
            DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            outputStream.writeLong(1L);
            outputStream.close();
            ArrayList<Object> pipeDataList = new ArrayList<Object>();
            DataOutputStream pipeLogOutput1 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)0L)), false));
            for (int i2 = 0; i2 < 4; ++i2) {
                TsFilePipeData pipeData2 = new TsFilePipeData("fake" + i2, (long)i2);
                pipeDataList.add(pipeData2);
                pipeData2.serialize(pipeLogOutput1);
            }
            pipeLogOutput1.close();
            DataOutputStream pipeLogOutput2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)4L)), false));
            for (i = 4; i < 8; ++i) {
                pipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0L, 99L), (long)i);
                pipeDataList.add(pipeData);
                pipeData.serialize(pipeLogOutput2);
            }
            for (i = 8; i < 11; ++i) {
                pipeData = new SchemaPipeData((PhysicalPlan)new SetStorageGroupPlan(new PartialPath("fake" + i)), (long)i);
                pipeDataList.add(pipeData);
                pipeData.serialize(pipeLogOutput2);
            }
            pipeLogOutput2.close();
            DataOutputStream pipeLogOutput3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)11L)), false));
            pipeLogOutput3.close();
            BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            Assert.assertEquals((long)1L, (long)pipeDataQueue.getCommitSerialNumber());
            Assert.assertEquals((long)10L, (long)pipeDataQueue.getLastMaxSerialNumber());
            TsFilePipeData offerPipeData = new TsFilePipeData("fake11", 11L);
            pipeDataList.add(offerPipeData);
            pipeDataQueue.offer((PipeData)offerPipeData);
            ArrayList pipeDataTakeList = new ArrayList();
            ExecutorService es1 = Executors.newSingleThreadExecutor();
            es1.execute(() -> {
                try {
                    while (true) {
                        pipeDataTakeList.add(pipeDataQueue.take());
                        pipeDataQueue.commit();
                    }
                }
                catch (InterruptedException e) {
                    return;
                }
            });
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            es1.shutdownNow();
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                Assert.fail();
            }
            Assert.assertEquals((long)10L, (long)pipeDataTakeList.size());
            for (int i3 = 0; i3 < 10; ++i3) {
                Assert.assertEquals(pipeDataList.get(i3 + 2), pipeDataTakeList.get(i3));
            }
            pipeDataQueue.clear();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testRecoveryWithEmptyPipeLog() {
        try {
            DeletionPipeData pipeData;
            int i;
            DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            outputStream.writeLong(1L);
            outputStream.close();
            ArrayList<Object> pipeDataList = new ArrayList<Object>();
            DataOutputStream pipeLogOutput1 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)0L)), false));
            for (int i2 = 0; i2 < 4; ++i2) {
                TsFilePipeData pipeData2 = new TsFilePipeData("fake" + i2, (long)i2);
                pipeDataList.add(pipeData2);
                pipeData2.serialize(pipeLogOutput1);
            }
            pipeLogOutput1.close();
            DataOutputStream pipeLogOutput2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)4L)), false));
            for (i = 4; i < 8; ++i) {
                pipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0L, 99L), (long)i);
                pipeDataList.add(pipeData);
                pipeData.serialize(pipeLogOutput2);
            }
            for (i = 8; i < 11; ++i) {
                pipeData = new SchemaPipeData((PhysicalPlan)new SetStorageGroupPlan(new PartialPath("fake" + i)), (long)i);
                pipeDataList.add(pipeData);
                pipeData.serialize(pipeLogOutput2);
            }
            pipeLogOutput2.close();
            DataOutputStream pipeLogOutput3 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)11L)), false));
            pipeLogOutput3.close();
            BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            Assert.assertEquals((long)1L, (long)pipeDataQueue.getCommitSerialNumber());
            Assert.assertEquals((long)10L, (long)pipeDataQueue.getLastMaxSerialNumber());
            ArrayList pipeDataTakeList = new ArrayList();
            ExecutorService es1 = Executors.newSingleThreadExecutor();
            es1.execute(() -> {
                try {
                    while (true) {
                        pipeDataTakeList.add(pipeDataQueue.take());
                        pipeDataQueue.commit();
                    }
                }
                catch (InterruptedException e) {
                    return;
                }
            });
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            es1.shutdownNow();
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                Assert.fail();
            }
            Assert.assertEquals((long)9L, (long)pipeDataTakeList.size());
            for (int i3 = 0; i3 < 9; ++i3) {
                Assert.assertEquals(pipeDataList.get(i3 + 2), pipeDataTakeList.get(i3));
            }
            pipeDataQueue.clear();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testRecoveryWithoutEmptyPipeLog() {
        try {
            DeletionPipeData pipeData;
            int i;
            DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            outputStream.writeLong(1L);
            outputStream.close();
            ArrayList<Object> pipeDataList = new ArrayList<Object>();
            DataOutputStream pipeLogOutput1 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)0L)), false));
            for (int i2 = 0; i2 < 4; ++i2) {
                TsFilePipeData pipeData2 = new TsFilePipeData("fake" + i2, (long)i2);
                pipeDataList.add(pipeData2);
                pipeData2.serialize(pipeLogOutput1);
            }
            pipeLogOutput1.close();
            DataOutputStream pipeLogOutput2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)4L)), false));
            for (i = 4; i < 8; ++i) {
                pipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0L, 99L), (long)i);
                pipeDataList.add(pipeData);
                pipeData.serialize(pipeLogOutput2);
            }
            for (i = 8; i < 11; ++i) {
                pipeData = new SchemaPipeData((PhysicalPlan)new SetStorageGroupPlan(new PartialPath("fake" + i)), (long)i);
                pipeDataList.add(pipeData);
                pipeData.serialize(pipeLogOutput2);
            }
            pipeLogOutput2.close();
            BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            Assert.assertEquals((long)1L, (long)pipeDataQueue.getCommitSerialNumber());
            Assert.assertEquals((long)10L, (long)pipeDataQueue.getLastMaxSerialNumber());
            ArrayList pipeDataTakeList = new ArrayList();
            ExecutorService es1 = Executors.newSingleThreadExecutor();
            es1.execute(() -> {
                try {
                    while (true) {
                        pipeDataTakeList.add(pipeDataQueue.take());
                        pipeDataQueue.commit();
                    }
                }
                catch (InterruptedException e) {
                    return;
                }
            });
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            es1.shutdownNow();
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                Assert.fail();
            }
            Assert.assertEquals((long)9L, (long)pipeDataTakeList.size());
            for (int i3 = 0; i3 < 9; ++i3) {
                Assert.assertEquals(pipeDataList.get(i3 + 2), pipeDataTakeList.get(i3));
            }
            pipeDataQueue.clear();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

    @Test
    public void testOfferWhileTaking() {
        try {
            DeletionPipeData pipeData;
            int i;
            DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir, "commit.log"), true));
            outputStream.writeLong(1L);
            outputStream.close();
            ArrayList<Object> pipeDataList = new ArrayList<Object>();
            DataOutputStream pipeLogOutput1 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)0L)), false));
            for (int i2 = 0; i2 < 4; ++i2) {
                TsFilePipeData pipeData2 = new TsFilePipeData("fake" + i2, (long)i2);
                pipeDataList.add(pipeData2);
                pipeData2.serialize(pipeLogOutput1);
            }
            pipeLogOutput1.close();
            DataOutputStream pipeLogOutput2 = new DataOutputStream(new FileOutputStream(new File(this.pipeLogDir.getPath(), SyncPathUtil.getPipeLogName((long)4L)), false));
            for (i = 4; i < 8; ++i) {
                pipeData = new DeletionPipeData(new Deletion(new PartialPath("fake" + i), 0L, 99L), (long)i);
                pipeDataList.add(pipeData);
                pipeData.serialize(pipeLogOutput2);
            }
            for (i = 8; i < 11; ++i) {
                pipeData = new SchemaPipeData((PhysicalPlan)new SetStorageGroupPlan(new PartialPath("fake" + i)), (long)i);
                pipeDataList.add(pipeData);
                pipeData.serialize(pipeLogOutput2);
            }
            pipeLogOutput2.close();
            BufferedPipeDataQueue pipeDataQueue = new BufferedPipeDataQueue(this.pipeLogDir.getPath());
            Assert.assertEquals((long)1L, (long)pipeDataQueue.getCommitSerialNumber());
            Assert.assertEquals((long)10L, (long)pipeDataQueue.getLastMaxSerialNumber());
            ArrayList pipeDataTakeList = new ArrayList();
            ExecutorService es1 = Executors.newSingleThreadExecutor();
            es1.execute(() -> {
                try {
                    while (true) {
                        pipeDataTakeList.add(pipeDataQueue.take());
                        pipeDataQueue.commit();
                    }
                }
                catch (InterruptedException e) {
                    return;
                }
            });
            for (int i3 = 11; i3 < 20; ++i3) {
                pipeDataQueue.offer((PipeData)new DeletionPipeData(new Deletion(new PartialPath("fake" + i3), 0L, 0L), (long)i3));
            }
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            es1.shutdownNow();
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                Assert.fail();
            }
            Assert.assertEquals((long)18L, (long)pipeDataTakeList.size());
            for (int i4 = 0; i4 < 9; ++i4) {
                Assert.assertEquals(pipeDataList.get(i4 + 2), pipeDataTakeList.get(i4));
            }
            pipeDataQueue.clear();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }
}

