package org.apache.nifi.remote.protocol.socket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.protocol.ClientProtocol;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.HandshakeProperty;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/protocol/socket/SocketClientProtocol.class */
public class SocketClientProtocol implements ClientProtocol {
    private RemoteDestination destination;
    private String commsIdentifier;
    private int batchCount;
    private long batchSize;
    private long batchMillis;
    private EventReporter eventReporter;
    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(new int[]{6, 5, 4, 3, 2, 1});
    private boolean useCompression = false;
    private boolean handshakeComplete = false;
    private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
    private Response handshakeResponse = null;
    private boolean readyForFileTransfer = false;
    private String transitUriPrefix = null;
    private int timeoutMillis = 30000;

    public void setPreferredBatchCount(int i) {
        this.batchCount = i;
    }

    public void setPreferredBatchSize(long j) {
        this.batchSize = j;
    }

    public void setPreferredBatchDuration(long j) {
        this.batchMillis = j;
    }

    public void setEventReporter(EventReporter eventReporter) {
        this.eventReporter = eventReporter;
    }

    public void setDestination(RemoteDestination remoteDestination) {
        this.destination = remoteDestination;
        this.useCompression = remoteDestination.isUseCompression();
    }

    public void setTimeout(int i) {
        this.timeoutMillis = i;
    }

    @Override // org.apache.nifi.remote.protocol.ClientProtocol
    public void handshake(Peer peer) throws IOException {
        handshake(peer, this.destination.getIdentifier());
    }

    public void handshake(Peer peer, String str) throws IOException {
        if (this.handshakeComplete) {
            throw new IllegalStateException("Handshake has already been completed");
        }
        this.commsIdentifier = UUID.randomUUID().toString();
        this.logger.debug("{} handshaking with {}", this, peer);
        HashMap hashMap = new HashMap();
        hashMap.put(HandshakeProperty.GZIP, String.valueOf(this.useCompression));
        if (str != null) {
            hashMap.put(HandshakeProperty.PORT_IDENTIFIER, str);
        }
        hashMap.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(this.timeoutMillis));
        if (this.versionNegotiator.getVersion() >= 5) {
            if (this.batchCount > 0) {
                hashMap.put(HandshakeProperty.BATCH_COUNT, String.valueOf(this.batchCount));
            }
            if (this.batchSize > 0) {
                hashMap.put(HandshakeProperty.BATCH_SIZE, String.valueOf(this.batchSize));
            }
            if (this.batchMillis > 0) {
                hashMap.put(HandshakeProperty.BATCH_DURATION, String.valueOf(this.batchMillis));
            }
        }
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        communicationsSession.setTimeout(this.timeoutMillis);
        DataInputStream dataInputStream = new DataInputStream(communicationsSession.getInput().getInputStream());
        DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        dataOutputStream.writeUTF(this.commsIdentifier);
        if (this.versionNegotiator.getVersion() >= 3) {
            dataOutputStream.writeUTF(peer.getUrl());
            this.transitUriPrefix = peer.getUrl();
            if (!this.transitUriPrefix.endsWith("/")) {
                this.transitUriPrefix += "/";
            }
        }
        this.logger.debug("Handshaking with properties {}", hashMap);
        dataOutputStream.writeInt(hashMap.size());
        for (Map.Entry entry : hashMap.entrySet()) {
            dataOutputStream.writeUTF(((HandshakeProperty) entry.getKey()).name());
            dataOutputStream.writeUTF((String) entry.getValue());
        }
        dataOutputStream.flush();
        try {
            this.handshakeResponse = Response.read(dataInputStream);
            switch (this.handshakeResponse.getCode()) {
                case PORT_NOT_IN_VALID_STATE:
                case UNKNOWN_PORT:
                case PORTS_DESTINATION_FULL:
                    break;
                case PROPERTIES_OK:
                    this.readyForFileTransfer = true;
                    break;
                default:
                    this.logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[]{this, this.handshakeResponse, peer});
                    peer.close();
                    throw new HandshakeException("Received unexpected response " + this.handshakeResponse);
            }
            this.logger.debug("{} Finished handshake with {}", this, peer);
            this.handshakeComplete = true;
        } catch (ProtocolException e) {
            throw new HandshakeException(e);
        }
    }

    @Override // org.apache.nifi.remote.protocol.ClientProtocol
    public boolean isPortInvalid() {
        if (this.handshakeComplete) {
            return this.handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
        }
        throw new IllegalStateException("Handshake has not completed successfully");
    }

    @Override // org.apache.nifi.remote.protocol.ClientProtocol
    public boolean isPortUnknown() {
        if (this.handshakeComplete) {
            return this.handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
        }
        throw new IllegalStateException("Handshake has not completed successfully");
    }

    @Override // org.apache.nifi.remote.protocol.ClientProtocol
    public boolean isDestinationFull() {
        if (this.handshakeComplete) {
            return this.handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
        }
        throw new IllegalStateException("Handshake has not completed successfully");
    }

    @Override // org.apache.nifi.remote.protocol.ClientProtocol
    public Set<PeerStatus> getPeerStatuses(Peer peer) throws IOException {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not been performed");
        }
        this.logger.debug("{} Get Peer Statuses from {}", this, peer);
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        DataInputStream dataInputStream = new DataInputStream(communicationsSession.getInput().getInputStream());
        DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        boolean z = getVersionNegotiator().getVersion() >= 6;
        RequestType.REQUEST_PEER_LIST.writeRequestType(dataOutputStream);
        dataOutputStream.flush();
        int readInt = dataInputStream.readInt();
        HashSet hashSet = new HashSet(readInt);
        for (int i = 0; i < readInt; i++) {
            hashSet.add(new PeerStatus(new PeerDescription(dataInputStream.readUTF(), dataInputStream.readInt(), dataInputStream.readBoolean()), dataInputStream.readInt(), z));
        }
        this.logger.debug("{} Received {} Peer Statuses from {}", new Object[]{this, Integer.valueOf(hashSet.size()), peer});
        return hashSet;
    }

    @Override // org.apache.nifi.remote.protocol.ClientProtocol
    public FlowFileCodec negotiateCodec(Peer peer) throws IOException {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not been performed");
        }
        this.logger.debug("{} Negotiating Codec with {}", this, peer);
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        DataInputStream dataInputStream = new DataInputStream(communicationsSession.getInput().getInputStream());
        DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dataOutputStream);
        try {
            FlowFileCodec flowFileCodec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(new StandardFlowFileCodec(), dataInputStream, dataOutputStream);
            this.logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[]{this, flowFileCodec, communicationsSession});
            return flowFileCodec;
        } catch (HandshakeException e) {
            throw new ProtocolException(e.toString());
        }
    }

    @Override // org.apache.nifi.remote.protocol.ClientProtocol
    public Transaction startTransaction(Peer peer, FlowFileCodec flowFileCodec, TransferDirection transferDirection) throws IOException {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not been performed");
        }
        if (this.readyForFileTransfer) {
            return new SocketClientTransaction(this.versionNegotiator.getVersion(), this.destination.getIdentifier(), peer, flowFileCodec, transferDirection, this.useCompression, (int) this.destination.getYieldPeriod(TimeUnit.MILLISECONDS), this.eventReporter);
        }
        throw new IllegalStateException("Cannot start transaction; handshake resolution was " + this.handshakeResponse);
    }

    @Override // org.apache.nifi.remote.VersionedRemoteResource
    public VersionNegotiator getVersionNegotiator() {
        return this.versionNegotiator;
    }

    @Override // org.apache.nifi.remote.protocol.ClientProtocol
    public void shutdown(Peer peer) throws IOException {
        this.readyForFileTransfer = false;
        DataOutputStream dataOutputStream = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
        this.logger.debug("{} Shutting down with {}", this, peer);
        RequestType.SHUTDOWN.writeRequestType(dataOutputStream);
        dataOutputStream.flush();
    }

    @Override // org.apache.nifi.remote.VersionedRemoteResource
    public String getResourceName() {
        return "SocketFlowFileProtocol";
    }

    public String toString() {
        return "SocketClientProtocol[CommsID=" + this.commsIdentifier + "]";
    }
}
