/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.wal.buffer;

import java.io.File;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.wal.buffer.IWALBuffer;
import org.apache.iotdb.db.wal.buffer.WALBuffer;
import org.apache.iotdb.db.wal.buffer.WALEntry;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.wal.buffer.WALInfoEntry;
import org.apache.iotdb.db.wal.io.WALReader;
import org.apache.iotdb.db.wal.utils.WALFileUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class WALBufferCommonTest {
    protected static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    protected static final String identifier = String.valueOf(Integer.MAX_VALUE);
    protected static final boolean preIsClusterMode = config.isClusterMode();
    protected static final String logDirectory = TestConstant.BASE_OUTPUT_PATH.concat("wal-test");
    protected static final String devicePath = "root.test_sg.test_d";
    protected IWALBuffer walBuffer;

    @Before
    public void setUp() throws Exception {
        this.walBuffer = new WALBuffer(identifier, logDirectory);
        config.setClusterMode(true);
    }

    @After
    public void tearDown() throws Exception {
        this.walBuffer.close();
        config.setClusterMode(preIsClusterMode);
    }

    @Test
    public void testConcurrentWrite() throws Exception {
        int threadsNum = 3;
        ExecutorService executorService = Executors.newFixedThreadPool(threadsNum);
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        ConcurrentHashMap.KeySetView expectedInsertRowNodes = ConcurrentHashMap.newKeySet();
        int i = 0;
        while (i < threadsNum) {
            int n = i++;
            Callable<Void> writeTask = () -> {
                try {
                    this.writeInsertRowNode(memTableId, expectedInsertRowNodes);
                }
                catch (IllegalPathException e) {
                    Assert.fail();
                }
                return null;
            };
            Future<Void> future = executorService.submit(writeTask);
            futures.add(future);
        }
        for (Future future : futures) {
            future.get();
        }
        while (!this.walBuffer.isAllWALEntriesConsumed()) {
            Thread.sleep(1000L);
        }
        Thread.sleep(1000L);
        File[] walFiles = WALFileUtils.listAllWALFiles((File)new File(logDirectory));
        HashSet<InsertRowNode> hashSet = new HashSet<InsertRowNode>();
        if (walFiles != null) {
            for (File walFile : walFiles) {
                try (WALReader walReader = new WALReader(walFile);){
                    while (walReader.hasNext()) {
                        hashSet.add((InsertRowNode)walReader.next().getValue());
                    }
                }
            }
        }
        Assert.assertEquals(expectedInsertRowNodes, hashSet);
    }

    private void writeInsertRowNode(int memTableId, Set<InsertRowNode> expectedInsertRowNodes) throws IllegalPathException, QueryProcessException {
        for (int i = 0; i < 100; ++i) {
            InsertRowNode insertRowNode = this.getInsertRowNode(devicePath + memTableId, i);
            expectedInsertRowNodes.add(insertRowNode);
            WALInfoEntry walEntry = new WALInfoEntry((long)memTableId, (WALEntryValue)insertRowNode);
            this.walBuffer.write((WALEntry)walEntry);
        }
    }

    private InsertRowNode getInsertRowNode(String devicePath, long time) throws IllegalPathException, QueryProcessException {
        TSDataType[] dataTypes = new TSDataType[]{TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64, TSDataType.INT32, TSDataType.BOOLEAN, TSDataType.TEXT};
        Object[] columns = new Object[]{1.0, Float.valueOf(2.0f), 10000L, 100, false, new Binary("hh0")};
        InsertRowNode node = new InsertRowNode(new PlanNodeId(""), new PartialPath(devicePath), false, new String[]{"s1", "s2", "s3", "s4", "s5", "s6"}, dataTypes, time, columns, false);
        MeasurementSchema[] schemas = new MeasurementSchema[6];
        for (int i = 0; i < 6; ++i) {
            schemas[i] = new MeasurementSchema("s" + (i + 1), dataTypes[i]);
        }
        node.setMeasurementSchemas(schemas);
        return node;
    }

    @Test
    public void testHugeWrite() throws Exception {
        int prevWalBufferSize = config.getWalBufferSize();
        config.setWalBufferSize(32);
        try {
            this.testConcurrentWrite();
        }
        finally {
            config.setWalBufferSize(prevWalBufferSize);
        }
    }
}

