/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.sync.transport;

import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.transport.client.IoTDBSyncClient;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.session.util.Version;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SyncTransportTest {
    private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    File tmpDir = new File("target/synctest");
    String pipeName1 = "pipe1";
    String remoteIp1;
    long createdTime1 = System.currentTimeMillis();
    File fileDir;
    File tsfile;
    File resourceFile;
    File modsFile;

    @Before
    public void setUp() throws Exception {
        File[] fileList;
        EnvironmentUtils.envSetUp();
        this.remoteIp1 = "127.0.0.1";
        this.fileDir = new File(SyncPathUtil.getReceiverFileDataDir((String)this.pipeName1, (String)this.remoteIp1, (long)this.createdTime1));
        this.prepareData();
        EnvironmentUtils.shutdownDaemon();
        File srcDir = new File(IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0] + File.separator + "sequence" + File.separator + "root.vehicle" + File.separator + "0" + File.separator + "0");
        if (this.tmpDir.exists()) {
            FileUtils.deleteDirectory((File)this.tmpDir);
        }
        FileUtils.moveDirectory((File)srcDir, (File)this.tmpDir);
        this.tsfile = null;
        this.resourceFile = null;
        this.modsFile = null;
        for (File f : fileList = this.tmpDir.listFiles()) {
            if (f.getName().endsWith(".tsfile")) {
                this.tsfile = f;
                continue;
            }
            if (f.getName().endsWith(".mods")) {
                this.modsFile = f;
                continue;
            }
            if (!f.getName().endsWith(".resource")) continue;
            this.resourceFile = f;
        }
        EnvironmentUtils.cleanEnv();
        EnvironmentUtils.envSetUp();
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory((File)this.tmpDir);
        EnvironmentUtils.cleanEnv();
    }

    @Test
    public void testTransportFile() throws Exception {
        TSyncIdentityInfo identityInfo = new TSyncIdentityInfo("127.0.0.1", this.pipeName1, this.createdTime1, config.getIoTDBVersion());
        try (TTransport transport = RpcTransportFactory.INSTANCE.getTransport((TTransport)new TSocket(TConfigurationConst.defaultTConfiguration, "127.0.0.1", 6667, 100000, 1000));){
            Object protocol = config.isRpcThriftCompressionEnable() ? new TCompactProtocol(transport) : new TBinaryProtocol(transport);
            IClientRPCService.Client serviceClient = new IClientRPCService.Client((TProtocol)protocol);
            if (!transport.isOpen()) {
                transport.open();
            }
            byte[] buffer = new byte[10];
            try (RandomAccessFile randomAccessFile = new RandomAccessFile(this.tsfile, "rw");){
                try {
                    serviceClient.sendFile(new TSyncTransportMetaInfo(this.tsfile.getName(), 0L), ByteBuffer.wrap(buffer));
                    Assert.fail();
                }
                catch (TException tException) {
                    // empty catch block
                }
                serviceClient.handshake(identityInfo);
                randomAccessFile.read(buffer, 0, 10);
                TSStatus tsStatus1 = serviceClient.sendFile(new TSyncTransportMetaInfo(this.tsfile.getName(), 1L), ByteBuffer.wrap(buffer));
                Assert.assertEquals((long)tsStatus1.getCode(), (long)TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
                Assert.assertEquals((Object)tsStatus1.getMessage(), (Object)"0");
                TSStatus tsStatus2 = serviceClient.sendFile(new TSyncTransportMetaInfo(this.tsfile.getName(), 0L), ByteBuffer.wrap(buffer));
                Assert.assertEquals((long)tsStatus2.getCode(), (long)TSStatusCode.SUCCESS_STATUS.getStatusCode());
                TSStatus tsStatus3 = serviceClient.sendFile(new TSyncTransportMetaInfo(this.tsfile.getName(), 0L), ByteBuffer.wrap(buffer));
                Assert.assertEquals((long)tsStatus3.getCode(), (long)TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
                Assert.assertEquals((Object)tsStatus3.getMessage(), (Object)"10");
                TSStatus tsStatus4 = serviceClient.sendFile(new TSyncTransportMetaInfo(this.tsfile.getName(), 100L), ByteBuffer.wrap(buffer));
                Assert.assertEquals((long)tsStatus4.getCode(), (long)TSStatusCode.SYNC_FILE_REBASE.getStatusCode());
                Assert.assertEquals((Object)tsStatus4.getMessage(), (Object)"10");
                byte[] remainBuffer = new byte[(int)(randomAccessFile.length() - 10L)];
                randomAccessFile.read(remainBuffer, 0, (int)(randomAccessFile.length() - 10L));
                TSStatus tsStatus5 = serviceClient.sendFile(new TSyncTransportMetaInfo(this.tsfile.getName(), 10L), ByteBuffer.wrap(remainBuffer));
                Assert.assertEquals((long)tsStatus5.getCode(), (long)TSStatusCode.SUCCESS_STATUS.getStatusCode());
            }
        }
        File receiveFile = new File(SyncPathUtil.getFileDataDirPath((TSyncIdentityInfo)identityInfo), this.tsfile.getName() + ".patch");
        Assert.assertTrue((boolean)receiveFile.exists());
        try (RandomAccessFile originFileRAF = new RandomAccessFile(this.tsfile, "r");
             RandomAccessFile receiveFileRAF = new RandomAccessFile(receiveFile, "r");){
            Assert.assertEquals((long)originFileRAF.length(), (long)receiveFileRAF.length());
            byte[] buffer1 = new byte[(int)originFileRAF.length()];
            byte[] buffer2 = new byte[(int)receiveFile.length()];
            originFileRAF.read(buffer1);
            receiveFileRAF.read(buffer2);
            Assert.assertArrayEquals((byte[])buffer1, (byte[])buffer2);
        }
    }

    @Test
    public void testTransportPipeData() throws Exception {
        try (TTransport transport = RpcTransportFactory.INSTANCE.getTransport((TTransport)new TSocket(TConfigurationConst.defaultTConfiguration, "127.0.0.1", 6667, 100000, 1000));){
            TSStatus tsStatus2;
            Object protocol = config.isRpcThriftCompressionEnable() ? new TCompactProtocol(transport) : new TBinaryProtocol(transport);
            IClientRPCService.Client serviceClient = new IClientRPCService.Client((TProtocol)protocol);
            if (!transport.isOpen()) {
                transport.open();
            }
            SchemaPipeData pipeData = new SchemaPipeData((PhysicalPlan)new SetStorageGroupPlan(new PartialPath("root.sg1")), 0L);
            byte[] buffer = pipeData.serialize();
            ByteBuffer buffToSend = ByteBuffer.wrap(buffer);
            try {
                tsStatus2 = serviceClient.sendPipeData(buffToSend);
                Assert.fail();
            }
            catch (TException tsStatus2) {
                // empty catch block
            }
            serviceClient.handshake(new TSyncIdentityInfo("127.0.0.1", this.pipeName1, this.createdTime1, config.getIoTDBVersion()));
            tsStatus2 = serviceClient.sendPipeData(buffToSend);
            Assert.assertEquals((long)tsStatus2.getCode(), (long)TSStatusCode.SUCCESS_STATUS.getStatusCode());
        }
    }

    @Test
    public void testSyncClient() throws Exception {
        Assert.assertNotNull((Object)this.tsfile);
        Assert.assertNotNull((Object)this.modsFile);
        Assert.assertNotNull((Object)this.resourceFile);
        int serialNum = 0;
        ArrayList<Object> pipeDataList = new ArrayList<Object>();
        pipeDataList.add(new SchemaPipeData((PhysicalPlan)new SetStorageGroupPlan(new PartialPath("root.vehicle")), (long)serialNum++));
        pipeDataList.add(new SchemaPipeData((PhysicalPlan)new CreateTimeSeriesPlan(new PartialPath("root.vehicle.d0.s0"), new MeasurementSchema("s0", TSDataType.INT32, TSEncoding.RLE)), (long)serialNum++));
        TsFilePipeData tsFilePipeData = new TsFilePipeData(this.tsfile.getPath(), (long)serialNum++);
        pipeDataList.add(tsFilePipeData);
        Deletion deletion = new Deletion(new PartialPath("root.vehicle.**"), 0L, 33L, 38L);
        pipeDataList.add(new DeletionPipeData(deletion, (long)serialNum++));
        TsFilePipe pipe = new TsFilePipe(this.createdTime1, this.pipeName1, null, 0L, false);
        IoTDBSyncClient client = new IoTDBSyncClient((Pipe)pipe, "127.0.0.1", IoTDBDescriptor.getInstance().getConfig().getRpcPort(), "127.0.0.1");
        client.handshake();
        for (PipeData pipeData : pipeDataList) {
            client.send(pipeData);
        }
        this.checkResult("select ** from root.vehicle", new String[]{"Time", "root.vehicle.d0.s0"}, new String[]{"2,2"});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareData() throws Exception {
        try (Session session = new Session.Builder().host("127.0.0.1").port(6667).username("root").password("root").version(Version.V_0_13).build();){
            session.open(false);
            session.setFetchSize(10000);
            session.setStorageGroup("root.vehicle");
            List<String> measurements = Collections.singletonList("s0");
            List<TSDataType> types = Collections.singletonList(TSDataType.INT32);
            session.insertRecord("root.vehicle.d0", 1L, measurements, types, Collections.singletonList(1));
            session.insertRecord("root.vehicle.d0", 2L, measurements, types, Collections.singletonList(2));
            session.insertRecord("root.vehicle.d0", 35L, measurements, types, Collections.singletonList(35));
            session.executeNonQueryStatement("flush");
            session.executeNonQueryStatement("delete from root.vehicle.d0.s0 where time<2");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkResult(String sql, String[] columnNames, String[] retArray) throws Exception {
        try (Session session = new Session.Builder().host("127.0.0.1").port(6667).username("root").password("root").version(Version.V_0_13).build();){
            session.open(false);
            session.setFetchSize(10000);
            try (SessionDataSet dataSet = session.executeQueryStatement(sql);){
                Assert.assertArrayEquals((Object[])columnNames, (Object[])dataSet.getColumnNames().toArray(new String[0]));
                ArrayList<String> actualRetArray = new ArrayList<String>();
                while (dataSet.hasNext()) {
                    RowRecord rowRecord = dataSet.next();
                    StringBuilder rowString = new StringBuilder(rowRecord.getTimestamp() + ",");
                    rowRecord.getFields().forEach(i -> rowString.append(i.getStringValue()));
                    actualRetArray.add(rowString.toString());
                }
                Assert.assertArrayEquals((Object[])retArray, (Object[])actualRetArray.toArray(new String[0]));
            }
        }
    }
}

