package org.apache.heron.network;

import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.NIOLooper;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.network.HeronClient;
import org.apache.heron.common.network.HeronSocketOptions;
import org.apache.heron.common.network.StatusCode;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.instance.InstanceControlMsg;
import org.apache.heron.metrics.GatewayMetrics;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.stmgr.StreamManager;
import org.apache.heron.proto.system.Common;
import org.apache.heron.proto.system.HeronTuples;
import org.apache.heron.proto.system.PhysicalPlans;
import org.apache.heron.shaded.com.google.protobuf.ByteString;
import org.apache.heron.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/network/StreamManagerClient.class */
public class StreamManagerClient extends HeronClient {
    private static final Logger LOG = Logger.getLogger(StreamManagerClient.class.getName());
    private final String topologyName;
    private final String topologyId;
    private final PhysicalPlans.Instance instance;
    private final Communicator<Message> inStreamQueue;
    private final Communicator<Message> outStreamQueue;
    private final Communicator<InstanceControlMsg> inControlQueue;
    private final GatewayMetrics gatewayMetrics;
    private final SystemConfig systemConfig;
    private PhysicalPlanHelper helper;
    private long lastNotConnectedLogTime;

    public StreamManagerClient(NIOLooper nIOLooper, String str, int i, String str2, String str3, PhysicalPlans.Instance instance, Communicator<Message> communicator, Communicator<Message> communicator2, Communicator<InstanceControlMsg> communicator3, HeronSocketOptions heronSocketOptions, GatewayMetrics gatewayMetrics) {
        super(nIOLooper, str, i, heronSocketOptions);
        this.lastNotConnectedLogTime = 0L;
        this.topologyName = str2;
        this.topologyId = str3;
        this.instance = instance;
        this.inStreamQueue = communicator;
        this.outStreamQueue = communicator2;
        this.inControlQueue = communicator3;
        this.systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
        this.gatewayMetrics = gatewayMetrics;
        addStreamManagerClientTasksOnWakeUp();
    }

    private void addStreamManagerClientTasksOnWakeUp() {
        getNIOLooper().addTasksOnWakeup(new Runnable() { // from class: org.apache.heron.network.StreamManagerClient.1
            @Override // java.lang.Runnable
            public void run() {
                StreamManagerClient.this.sendStreamMessageIfNeeded();
                StreamManagerClient.this.readStreamMessageIfNeeded();
            }
        });
    }

    private void registerMessagesToHandle() {
        registerOnMessage(StreamManager.NewInstanceAssignmentMessage.newBuilder());
        registerOnMessage(HeronTuples.HeronTupleSet2.newBuilder());
        registerOnMessage(CheckpointManager.InitiateStatefulCheckpoint.newBuilder());
        registerOnMessage(CheckpointManager.RestoreInstanceStateRequest.newBuilder());
        registerOnMessage(CheckpointManager.StartInstanceStatefulProcessing.newBuilder());
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onError() {
        LOG.severe("Disconnected from Stream Manager.");
        LOG.info("Clean the old PhysicalPlanHelper in StreamManagerClient.");
        this.helper = null;
        onConnect(StatusCode.CONNECT_ERROR);
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onConnect(StatusCode statusCode) {
        if (statusCode != StatusCode.OK) {
            LOG.log(Level.WARNING, "Error connecting to Stream Manager with status: {0}, Retrying...", statusCode);
            getNIOLooper().registerTimerEvent(this.systemConfig.getInstanceReconnectStreammgrInterval(), new Runnable() { // from class: org.apache.heron.network.StreamManagerClient.2
                @Override // java.lang.Runnable
                public void run() {
                    StreamManagerClient.this.start();
                }
            });
        } else {
            registerMessagesToHandle();
            LOG.info("Connected to Stream Manager. Ready to send register request");
            sendRegisterRequest();
        }
    }

    private void sendRegisterRequest() {
        sendRequest(StreamManager.RegisterInstanceRequest.newBuilder().setInstance(this.instance).setTopologyName(this.topologyName).setTopologyId(this.topologyId).build(), null, StreamManager.RegisterInstanceResponse.newBuilder(), this.systemConfig.getInstanceReconnectStreammgrInterval());
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onResponse(StatusCode statusCode, Object obj, Message message) {
        if (statusCode != StatusCode.OK) {
            throw new RuntimeException("Response from Stream Manager not ok");
        }
        if (!(message instanceof StreamManager.RegisterInstanceResponse)) {
            throw new RuntimeException("Unknown kind of response received from Stream Manager");
        }
        handleRegisterResponse((StreamManager.RegisterInstanceResponse) message);
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onIncomingMessage(Message message) {
        this.gatewayMetrics.updateReceivedPacketsCount(1L);
        this.gatewayMetrics.updateReceivedPacketsSize(message.getSerializedSize());
        if (message instanceof StreamManager.NewInstanceAssignmentMessage) {
            LOG.info("Handling assignment message from direct NewInstanceAssignmentMessage");
            handleAssignmentMessage(((StreamManager.NewInstanceAssignmentMessage) message).getPplan());
            return;
        }
        if (message instanceof HeronTuples.HeronTupleSet2) {
            handleNewTuples2((HeronTuples.HeronTupleSet2) message);
            return;
        }
        if (message instanceof CheckpointManager.InitiateStatefulCheckpoint) {
            handleCheckpointRequest((CheckpointManager.InitiateStatefulCheckpoint) message);
        } else if (message instanceof CheckpointManager.RestoreInstanceStateRequest) {
            handleRestoreInstanceStateRequest((CheckpointManager.RestoreInstanceStateRequest) message);
        } else {
            if (!(message instanceof CheckpointManager.StartInstanceStatefulProcessing)) {
                throw new RuntimeException("Unknown kind of message received from Stream Manager");
            }
            handleStartStatefulRequest((CheckpointManager.StartInstanceStatefulProcessing) message);
        }
    }

    @Override // org.apache.heron.common.network.HeronClient
    public void onClose() {
        LOG.info("StreamManagerClient exits.");
    }

    public void sendAllMessage() {
        if (isConnected()) {
            LOG.info("Flushing all pending data in StreamManagerClient");
            int size = this.outStreamQueue.size();
            for (int i = 0; i < size; i++) {
                sendMessage(this.outStreamQueue.poll());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendStreamMessageIfNeeded() {
        if (!isStreamMgrReadyReceiveTuples()) {
            LOG.info("Stop writing due to not yet connected to Stream Manager.");
            return;
        }
        if (getOutstandingPackets() <= 0) {
            while (!this.outStreamQueue.isEmpty()) {
                Message poll = this.outStreamQueue.poll();
                this.gatewayMetrics.updateSentPacketsCount(1L);
                this.gatewayMetrics.updateSentPacketsSize(poll.getSerializedSize());
                sendMessage(poll);
            }
        }
        if (this.outStreamQueue.isEmpty()) {
            return;
        }
        startWriting();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void readStreamMessageIfNeeded() {
        if (!isConnected()) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastNotConnectedLogTime > 5000) {
                LOG.info(String.format("Stop reading due to not yet connected to Stream Manager. This message is throttled to emit no more than once every %d seconds.", 5L));
                this.lastNotConnectedLogTime = currentTimeMillis;
                return;
            }
            return;
        }
        if (isInQueuesAvailable() || this.helper == null) {
            startReading();
        } else {
            this.gatewayMetrics.updateInQueueFullCount();
            stopReading();
        }
    }

    private void handleStartStatefulRequest(CheckpointManager.StartInstanceStatefulProcessing startInstanceStatefulProcessing) {
        LOG.info("Received a StartInstanceStatefulProcessing request: " + startInstanceStatefulProcessing);
        this.inControlQueue.offer(InstanceControlMsg.newBuilder().setStartInstanceStatefulProcessing(startInstanceStatefulProcessing).build());
    }

    private void handleRestoreInstanceStateRequest(CheckpointManager.RestoreInstanceStateRequest restoreInstanceStateRequest) {
        LOG.info("Received a RestoreInstanceState request with checkpoint id: " + restoreInstanceStateRequest.getState().getCheckpointId());
        this.inControlQueue.offer(InstanceControlMsg.newBuilder().setRestoreInstanceStateRequest(restoreInstanceStateRequest).build());
    }

    private void handleCheckpointRequest(CheckpointManager.InitiateStatefulCheckpoint initiateStatefulCheckpoint) {
        LOG.info("Handling instance checkpoint request: " + initiateStatefulCheckpoint);
        this.inStreamQueue.offer(initiateStatefulCheckpoint);
    }

    private void handleRegisterResponse(StreamManager.RegisterInstanceResponse registerInstanceResponse) {
        if (registerInstanceResponse.getStatus().getStatus() != Common.StatusCode.OK) {
            throw new RuntimeException("Stream Manager returned a not ok response for register");
        }
        LOG.info("We registered ourselves to the Stream Manager");
        if (registerInstanceResponse.hasPplan()) {
            LOG.info("Handling assignment message from response");
            handleAssignmentMessage(registerInstanceResponse.getPplan());
        }
    }

    private void handleNewTuples2(HeronTuples.HeronTupleSet2 heronTupleSet2) {
        HeronTuples.HeronTupleSet.Builder newBuilder = HeronTuples.HeronTupleSet.newBuilder();
        newBuilder.setSrcTaskId(heronTupleSet2.getSrcTaskId());
        if (heronTupleSet2.hasControl()) {
            newBuilder.setControl(heronTupleSet2.getControl());
        } else {
            HeronTuples.HeronDataTupleSet.Builder newBuilder2 = HeronTuples.HeronDataTupleSet.newBuilder();
            newBuilder2.setStream(heronTupleSet2.getData().getStream());
            try {
                Iterator<ByteString> it = heronTupleSet2.getData().getTuplesList().iterator();
                while (it.hasNext()) {
                    newBuilder2.addTuples(HeronTuples.HeronDataTuple.parseFrom(it.next()));
                }
            } catch (InvalidProtocolBufferException e) {
                LOG.log(Level.SEVERE, "Failed to parse protobuf", (Throwable) e);
            }
            newBuilder.setData(newBuilder2);
        }
        this.inStreamQueue.offer(newBuilder.build());
    }

    private void handleAssignmentMessage(PhysicalPlans.PhysicalPlan physicalPlan) {
        LOG.fine("Physical Plan: " + physicalPlan);
        PhysicalPlanHelper physicalPlanHelper = new PhysicalPlanHelper(physicalPlan, this.instance.getInstanceId());
        if (this.helper != null && (!this.helper.getMyComponent().equals(physicalPlanHelper.getMyComponent()) || this.helper.getMyTaskId() != physicalPlanHelper.getMyTaskId())) {
            throw new RuntimeException("Our Assignment has changed. We will die to pick it");
        }
        if (this.helper == null) {
            LOG.info("We received a new Physical Plan.");
        } else {
            LOG.info("We received a new Physical Plan with same assignment. Should be state changes.");
            LOG.info(String.format("Old state: %s; new sate: %s.", this.helper.getTopologyState(), physicalPlanHelper.getTopologyState()));
        }
        this.helper = physicalPlanHelper;
        LOG.info("Push to Slave");
        this.inControlQueue.offer(InstanceControlMsg.newBuilder().setNewPhysicalPlanHelper(this.helper).build());
    }

    private boolean isStreamMgrReadyReceiveTuples() {
        return isConnected() && this.helper != null;
    }

    private boolean isInQueuesAvailable() {
        return this.inStreamQueue.size() < this.inStreamQueue.getExpectedAvailableCapacity();
    }
}
