/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.connector.v1;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.PipeRequestType;
import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IoTDBThriftReceiverV1
implements IoTDBThriftReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final String RECEIVER_FILE_DIR = IOTDB_CONFIG.getPipeReceiverFileDir();
    private File writingFile;
    private RandomAccessFile writingFileWriter;

    @Override
    public synchronized TPipeTransferResp receive(TPipeTransferReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
        short rawRequestType = req.getType();
        if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
            switch (PipeRequestType.valueOf(rawRequestType)) {
                case HANDSHAKE: {
                    return this.handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req));
                }
                case TRANSFER_INSERT_NODE: {
                    return this.handleTransferInsertNode(PipeTransferInsertNodeReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
                }
                case TRANSFER_TABLET: {
                    return this.handleTransferTablet(PipeTransferTabletReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
                }
                case TRANSFER_FILE_PIECE: {
                    return this.handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(req));
                }
                case TRANSFER_FILE_SEAL: {
                    return this.handleTransferFileSeal(PipeTransferFileSealReq.fromTPipeTransferReq(req), partitionFetcher, schemaFetcher);
                }
            }
        }
        return new TPipeTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TYPE_ERROR, (String)String.format("Unknown transfer type %s.", rawRequestType)));
    }

    private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq req) {
        if (!CommonDescriptor.getInstance().getConfig().getTimestampPrecision().equals(req.getTimestampPrecision())) {
            String msg = String.format("IoTDB receiver's timestamp precision %s, connector's timestamp precision %s. validation fails.", CommonDescriptor.getInstance().getConfig().getTimestampPrecision(), req.getTimestampPrecision());
            LOGGER.warn(msg);
            return new TPipeTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_HANDSHAKE_ERROR, (String)msg));
        }
        LOGGER.info("Handshake successfully.");
        return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
    }

    private TPipeTransferResp handleTransferInsertNode(PipeTransferInsertNodeReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
        return new TPipeTransferResp(this.executeStatement(req.constructStatement(), partitionFetcher, schemaFetcher));
    }

    private TPipeTransferResp handleTransferTablet(PipeTransferTabletReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
        InsertTabletStatement statement = req.constructStatement();
        return new TPipeTransferResp(statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : this.executeStatement(statement, partitionFetcher, schemaFetcher));
    }

    private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq req) {
        try {
            this.updateWritingFileIfNeeded(req.getFileName());
            if (!this.isWritingFileOffsetCorrect(req.getStartWritingOffset())) {
                return PipeTransferFilePieceResp.toTPipeTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET, (String)String.format("request sender reset file reader's offset from %s to %s.", req.getStartWritingOffset(), this.writingFileWriter.length())), this.writingFileWriter.length());
            }
            this.writingFileWriter.write(req.getFilePiece());
            return PipeTransferFilePieceResp.toTPipeTransferResp(RpcUtils.SUCCESS_STATUS, this.writingFileWriter.length());
        }
        catch (Exception e) {
            LOGGER.warn(String.format("failed to write file piece from req %s.", new Object[]{req}), (Throwable)e);
            TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TRANSFER_FILE_ERROR, (String)String.format("failed to write file piece, because %s", e.getMessage()));
            try {
                return PipeTransferFilePieceResp.toTPipeTransferResp(status, -1L);
            }
            catch (IOException ex) {
                return PipeTransferFilePieceResp.toTPipeTransferResp(status);
            }
        }
    }

    private void updateWritingFileIfNeeded(String fileName) throws IOException {
        File receiveDir;
        if (this.isFileExistedAndNameCorrect(fileName)) {
            return;
        }
        if (this.writingFileWriter != null) {
            this.writingFileWriter.close();
            this.writingFileWriter = null;
        }
        if (this.writingFile != null && this.writingFile.exists()) {
            if (this.writingFile.delete()) {
                LOGGER.info("original file {} was deleted.", (Object)this.writingFile.getPath());
            } else {
                LOGGER.warn("failed to delete original file {}.", (Object)this.writingFile.getPath());
            }
            this.writingFile = null;
        }
        if (!(receiveDir = new File(RECEIVER_FILE_DIR)).exists()) {
            if (receiveDir.mkdirs()) {
                LOGGER.info("receiver file dir {} was created.", (Object)receiveDir.getPath());
            } else {
                LOGGER.warn("failed to create receiver file dir {}.", (Object)receiveDir.getPath());
            }
        }
        this.writingFile = new File(RECEIVER_FILE_DIR, fileName);
        this.writingFileWriter = new RandomAccessFile(this.writingFile, "rw");
        LOGGER.info("start to write transferring file {}.", (Object)this.writingFile.getPath());
    }

    private boolean isFileExistedAndNameCorrect(String fileName) {
        return this.writingFile != null && this.writingFile.getName().equals(fileName);
    }

    private boolean isWritingFileOffsetCorrect(long offset) throws IOException {
        return this.writingFileWriter.length() == offset;
    }

    private TPipeTransferResp handleTransferFileSeal(PipeTransferFileSealReq req, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
        try {
            if (!this.isWritingFileAvailable()) {
                return new TPipeTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TRANSFER_FILE_ERROR, (String)String.format("failed to seal file, because writing file %s is not available.", req.getFileName())));
            }
            if (!this.isFileExistedAndNameCorrect(req.getFileName())) {
                return new TPipeTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TRANSFER_FILE_ERROR, (String)String.format("failed to seal file %s, but writing file is %s.", req.getFileName(), this.writingFile)));
            }
            if (!this.isWritingFileOffsetCorrect(req.getFileLength())) {
                return new TPipeTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TRANSFER_FILE_ERROR, (String)String.format("failed to seal file because the length of file is not correct. the original file has length %s, but receiver file has length %s.", req.getFileLength(), this.writingFileWriter.length())));
            }
            LoadTsFileStatement statement = new LoadTsFileStatement(this.writingFile.getAbsolutePath());
            this.writingFileWriter.close();
            this.writingFile = null;
            statement.setDeleteAfterLoad(true);
            statement.setVerifySchema(true);
            statement.setAutoCreateDatabase(false);
            return new TPipeTransferResp(this.executeStatement(statement, partitionFetcher, schemaFetcher));
        }
        catch (IOException e) {
            LOGGER.warn(String.format("failed to seal file %s from req %s.", new Object[]{this.writingFile, req}), (Throwable)e);
            return new TPipeTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TRANSFER_FILE_ERROR, (String)String.format("failed to seal file %s because %s", this.writingFile, e.getMessage())));
        }
    }

    private boolean isWritingFileAvailable() {
        return this.writingFile != null && this.writingFile.exists() && this.writingFileWriter != null;
    }

    private TSStatus executeStatement(Statement statement, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) {
        if (statement == null) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, (String)"Execute null statement.");
        }
        long queryId = SessionManager.getInstance().requestQueryId();
        ExecutionResult result = Coordinator.getInstance().execute(statement, queryId, null, "", partitionFetcher, schemaFetcher, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
        if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            LOGGER.warn("failed to execute statement, statement: {}, result status is: {}", (Object)statement, (Object)result.status);
        }
        return result.status;
    }

    @Override
    public synchronized void handleExit() {
        try {
            if (this.writingFileWriter != null) {
                this.writingFileWriter.close();
            }
            if (this.writingFile != null && !this.writingFile.delete()) {
                LOGGER.warn("IoTDBThriftReceiverV1#handleExit: delete file {} error.", (Object)this.writingFile.getPath());
            }
        }
        catch (IOException e) {
            LOGGER.warn("IoTDBThriftReceiverV1#handleExit: meeting errors on handleExit().", (Throwable)e);
        }
    }

    @Override
    public IoTDBThriftConnectorRequestVersion getVersion() {
        return IoTDBThriftConnectorRequestVersion.VERSION_1;
    }
}

