package org.apache.nifi.remote.protocol.socket;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/protocol/socket/SocketClientTransaction.class */
public class SocketClientTransaction implements Transaction {
    private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
    private final int protocolVersion;
    private final FlowFileCodec codec;
    private final DataInputStream dis;
    private final DataOutputStream dos;
    private final TransferDirection direction;
    private final boolean compress;
    private final Peer peer;
    private final int penaltyMillis;
    private final String destinationId;
    private final EventReporter eventReporter;
    private final long creationNanoTime = System.nanoTime();
    private final CRC32 crc = new CRC32();
    private boolean dataAvailable = false;
    private int transfers = 0;
    private long contentBytes = 0;
    private Transaction.TransactionState state = Transaction.TransactionState.TRANSACTION_STARTED;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SocketClientTransaction(int i, String str, Peer peer, FlowFileCodec flowFileCodec, TransferDirection transferDirection, boolean z, int i2, EventReporter eventReporter) throws IOException {
        this.protocolVersion = i;
        this.destinationId = str;
        this.peer = peer;
        this.codec = flowFileCodec;
        this.direction = transferDirection;
        this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
        this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
        this.compress = z;
        this.penaltyMillis = i2;
        this.eventReporter = eventReporter;
        initialize();
    }

    private void initialize() throws IOException {
        try {
            if (this.direction == TransferDirection.RECEIVE) {
                RequestType.RECEIVE_FLOWFILES.writeRequestType(this.dos);
                this.dos.flush();
                Response read = Response.read(this.dis);
                switch (read.getCode()) {
                    case MORE_DATA:
                        logger.debug("{} {} Indicates that data is available", this, this.peer);
                        this.dataAvailable = true;
                        break;
                    case NO_MORE_DATA:
                        logger.debug("{} No data available from {}", this.peer);
                        this.dataAvailable = false;
                        return;
                    default:
                        throw new ProtocolException("Got unexpected response when asking for data: " + read);
                }
            } else {
                RequestType.SEND_FLOWFILES.writeRequestType(this.dos);
                this.dos.flush();
            }
        } catch (Exception e) {
            error();
            throw e;
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public DataPacket receive() throws IOException {
        try {
            try {
                if (this.state != Transaction.TransactionState.DATA_EXCHANGED && this.state != Transaction.TransactionState.TRANSACTION_STARTED) {
                    throw new IllegalStateException("Cannot receive data from " + this.peer + " because Transaction State is " + this.state);
                }
                if (this.direction == TransferDirection.SEND) {
                    throw new IllegalStateException("Attempting to receive data from " + this.peer + " but started a SEND Transaction");
                }
                if (!this.dataAvailable) {
                    return null;
                }
                if (this.transfers > 0) {
                    Response read = Response.read(this.dis);
                    switch (read.getCode()) {
                        case CONTINUE_TRANSACTION:
                            logger.debug("{} {} Indicates Transaction should continue", this, this.peer);
                            this.dataAvailable = true;
                            break;
                        case FINISH_TRANSACTION:
                            logger.debug("{} {} Indicates Transaction should finish", this, this.peer);
                            this.dataAvailable = false;
                            break;
                        default:
                            throw new ProtocolException("Got unexpected response from " + this.peer + " when asking for data: " + read);
                    }
                }
                if (!this.dataAvailable) {
                    return null;
                }
                logger.debug("{} Receiving data from {}", this, this.peer);
                DataPacket decode = this.codec.decode(new CheckedInputStream(this.compress ? new CompressionInputStream(this.dis) : this.dis, this.crc));
                if (decode == null) {
                    this.dataAvailable = false;
                } else {
                    this.transfers++;
                    this.contentBytes += decode.getSize();
                }
                this.state = Transaction.TransactionState.DATA_EXCHANGED;
                return decode;
            } catch (IOException e) {
                throw new IOException("Failed to receive data from " + this.peer + " due to " + e, e);
            }
        } catch (Exception e2) {
            error();
            throw e2;
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public void send(byte[] bArr, Map<String, String> map) throws IOException {
        send(new StandardDataPacket(map, new ByteArrayInputStream(bArr), bArr.length));
    }

    @Override // org.apache.nifi.remote.Transaction
    public void send(DataPacket dataPacket) throws IOException {
        try {
            try {
                if (this.state != Transaction.TransactionState.DATA_EXCHANGED && this.state != Transaction.TransactionState.TRANSACTION_STARTED) {
                    throw new IllegalStateException("Cannot send data to " + this.peer + " because Transaction State is " + this.state);
                }
                if (this.direction == TransferDirection.RECEIVE) {
                    throw new IllegalStateException("Attempting to send data to " + this.peer + " but started a RECEIVE Transaction");
                }
                if (this.transfers > 0) {
                    ResponseCode.CONTINUE_TRANSACTION.writeResponse(this.dos);
                }
                logger.debug("{} Sending data to {}", this, this.peer);
                CheckedOutputStream checkedOutputStream = new CheckedOutputStream(this.compress ? new CompressionOutputStream(this.dos) : this.dos, this.crc);
                this.codec.encode(dataPacket, checkedOutputStream);
                if (this.compress) {
                    checkedOutputStream.close();
                }
                this.transfers++;
                this.contentBytes += dataPacket.getSize();
                this.state = Transaction.TransactionState.DATA_EXCHANGED;
            } catch (IOException e) {
                throw new IOException("Failed to send data to " + this.peer + " due to " + e, e);
            }
        } catch (Exception e2) {
            error();
            throw e2;
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public void cancel(String str) throws IOException {
        if (this.state == Transaction.TransactionState.TRANSACTION_CANCELED || this.state == Transaction.TransactionState.TRANSACTION_COMPLETED || this.state == Transaction.TransactionState.ERROR) {
            throw new IllegalStateException("Cannot cancel transaction because state is already " + this.state);
        }
        try {
            ResponseCode.CANCEL_TRANSACTION.writeResponse(this.dos, str == null ? "<No explanation given>" : str);
            this.state = Transaction.TransactionState.TRANSACTION_CANCELED;
        } catch (IOException e) {
            error();
            throw new IOException("Failed to send 'cancel transaction' message to " + this.peer + " due to " + e, e);
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public TransactionCompletion complete() throws IOException {
        try {
            try {
                if (this.state != Transaction.TransactionState.TRANSACTION_CONFIRMED) {
                    throw new IllegalStateException("Cannot complete transaction with " + this.peer + " because state is " + this.state + "; Transaction can only be completed when state is " + Transaction.TransactionState.TRANSACTION_CONFIRMED);
                }
                boolean z = false;
                if (this.direction != TransferDirection.RECEIVE) {
                    try {
                        Response read = Response.read(this.dis);
                        logger.debug("{} Received {} from {}", new Object[]{this, read, this.peer});
                        if (read.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
                            this.peer.penalize(this.destinationId, this.penaltyMillis);
                            z = true;
                        } else if (read.getCode() != ResponseCode.TRANSACTION_FINISHED) {
                            throw new ProtocolException("After sending data to " + this.peer + ", expected TRANSACTION_FINISHED response but got " + read);
                        }
                        this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                    } catch (IOException e) {
                        throw new IOException(this + " Failed to receive a response from " + this.peer + " when expecting a TransactionFinished Indicator. It is unknown whether or not the peer successfully received/processed the data.", e);
                    }
                } else {
                    if (this.transfers == 0) {
                        this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                        return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - this.creationNanoTime);
                    }
                    logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, this.peer);
                    ResponseCode.TRANSACTION_FINISHED.writeResponse(this.dos);
                    this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                }
                return new SocketClientTransactionCompletion(z, this.transfers, this.contentBytes, System.nanoTime() - this.creationNanoTime);
            } catch (IOException e2) {
                throw new IOException("Failed to complete transaction with " + this.peer + " due to " + e2, e2);
            }
        } catch (Exception e3) {
            error();
            throw e3;
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public void confirm() throws IOException {
        try {
            try {
                if (this.state == Transaction.TransactionState.TRANSACTION_STARTED && !this.dataAvailable && this.direction == TransferDirection.RECEIVE) {
                    this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                    return;
                }
                if (this.state != Transaction.TransactionState.DATA_EXCHANGED) {
                    throw new IllegalStateException("Cannot confirm Transaction because state is " + this.state + "; Transaction can only be confirmed when state is " + Transaction.TransactionState.DATA_EXCHANGED);
                }
                if (this.direction != TransferDirection.RECEIVE) {
                    logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, this.peer);
                    ResponseCode.FINISH_TRANSACTION.writeResponse(this.dos);
                    String valueOf = String.valueOf(this.crc.getValue());
                    Response read = Response.read(this.dis);
                    if (read.getCode() != ResponseCode.CONFIRM_TRANSACTION) {
                        throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + this.peer + " but received " + read);
                    }
                    logger.trace("{} Received {} from {}", new Object[]{this, read, this.peer});
                    String message = read.getMessage();
                    if (this.protocolVersion > 3 && !message.equals(valueOf)) {
                        ResponseCode.BAD_CHECKSUM.writeResponse(this.dos);
                        throw new IOException(this + " Sent data to peer " + this.peer + " but calculated CRC32 Checksum as " + valueOf + " while peer calculated CRC32 Checksum as " + message + "; canceling transaction and rolling back session");
                    }
                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(this.dos, "");
                    this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                } else {
                    if (this.dataAvailable) {
                        throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
                    }
                    logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, this.peer);
                    ResponseCode.CONFIRM_TRANSACTION.writeResponse(this.dos, String.valueOf(this.crc.getValue()));
                    try {
                        Response read2 = Response.read(this.dis);
                        logger.trace("{} Received {} from {}", new Object[]{this, read2, this.peer});
                        switch (read2.getCode()) {
                            case CONFIRM_TRANSACTION:
                                this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                                break;
                            case BAD_CHECKSUM:
                                throw new IOException(this + " Received a BadChecksum response from peer " + this.peer);
                            default:
                                throw new ProtocolException(this + " Received unexpected Response from peer " + this.peer + " : " + read2 + "; expected 'Confirm Transaction' Response Code");
                        }
                    } catch (IOException e) {
                        logger.error("Failed to receive response code from {} when expecting confirmation of transaction", this.peer);
                        if (this.eventReporter != null) {
                            this.eventReporter.reportEvent(Severity.ERROR, EndpointConnectionPool.CATEGORY, "Failed to receive response code from " + this.peer + " when expecting confirmation of transaction");
                        }
                        throw e;
                    }
                }
            } catch (Exception e2) {
                error();
                throw e2;
            }
        } catch (IOException e3) {
            throw new IOException("Failed to confirm transaction with " + this.peer + " due to " + e3, e3);
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public void error() {
        this.state = Transaction.TransactionState.ERROR;
    }

    @Override // org.apache.nifi.remote.Transaction
    public Transaction.TransactionState getState() {
        return this.state;
    }

    @Override // org.apache.nifi.remote.Transaction
    public Communicant getCommunicant() {
        return this.peer;
    }

    public String toString() {
        return "SocketClientTransaction[Url=" + this.peer.getUrl() + ", TransferDirection=" + this.direction + ", State=" + this.state + "]";
    }
}
