package org.apache.heron.instance;

import java.io.Serializable;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.heron.api.generated.TopologyAPI;
import org.apache.heron.api.serializer.IPluggableSerializer;
import org.apache.heron.api.state.HashMapState;
import org.apache.heron.api.state.State;
import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.basics.SlaveLooper;
import org.apache.heron.common.config.SystemConfig;
import org.apache.heron.common.utils.metrics.MetricsCollector;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.common.utils.misc.ThreadNames;
import org.apache.heron.instance.bolt.BoltInstance;
import org.apache.heron.instance.spout.SpoutInstance;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
import org.apache.heron.proto.system.Common;
import org.apache.heron.proto.system.Metrics;
import org.apache.heron.shaded.com.google.protobuf.Message;

/* loaded from: input_file:org/apache/heron/instance/Slave.class */
public class Slave implements Runnable, AutoCloseable {
    private static final Logger LOG = Logger.getLogger(Slave.class.getName());
    private final SlaveLooper slaveLooper;
    private MetricsCollector metricsCollector;
    private final Communicator<Message> streamInCommunicator;
    private final Communicator<Message> streamOutCommunicator;
    private final Communicator<InstanceControlMsg> inControlQueue;
    private final Communicator<Metrics.MetricPublisherPublishMessage> metricsOutCommunicator;
    private IPluggableSerializer serializer;
    private IInstance instance;
    private PhysicalPlanHelper helper;
    private boolean isInstanceStarted = false;
    private State<Serializable, Serializable> instanceState = null;
    private boolean isStatefulProcessingStarted = false;
    private SystemConfig systemConfig = (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);

    public Slave(SlaveLooper slaveLooper, Communicator<Message> communicator, Communicator<Message> communicator2, Communicator<InstanceControlMsg> communicator3, Communicator<Metrics.MetricPublisherPublishMessage> communicator4) {
        this.slaveLooper = slaveLooper;
        this.streamInCommunicator = communicator;
        this.streamOutCommunicator = communicator2;
        this.inControlQueue = communicator3;
        this.metricsOutCommunicator = communicator4;
        this.metricsCollector = new MetricsCollector(slaveLooper, communicator4);
        handleControlMessage();
    }

    private void handleControlMessage() {
        this.slaveLooper.addTasksOnWakeup(new Runnable() { // from class: org.apache.heron.instance.Slave.1
            @Override // java.lang.Runnable
            public void run() {
                while (!Slave.this.inControlQueue.isEmpty()) {
                    InstanceControlMsg instanceControlMsg = (InstanceControlMsg) Slave.this.inControlQueue.poll();
                    if (instanceControlMsg.isStartInstanceStatefulProcessing()) {
                        Slave.this.handleStartInstanceStatefulProcessing(instanceControlMsg);
                    }
                    if (instanceControlMsg.isRestoreInstanceStateRequest()) {
                        Slave.this.handleRestoreInstanceStateRequest(instanceControlMsg);
                    }
                    if (instanceControlMsg.isNewPhysicalPlanHelper()) {
                        Slave.this.handleNewPhysicalPlan(instanceControlMsg);
                    }
                }
            }
        });
    }

    private void resetCurrentAssignment() {
        this.helper.setTopologyContext(this.metricsCollector);
        this.instance = this.helper.getMySpout() != null ? new SpoutInstance(this.helper, this.streamInCommunicator, this.streamOutCommunicator, this.slaveLooper) : new BoltInstance(this.helper, this.streamInCommunicator, this.streamOutCommunicator, this.slaveLooper);
        startInstanceIfNeeded();
    }

    private void handleNewAssignment() {
        LOG.log(Level.INFO, "Incarnating ourselves as {0} with task id {1}", new Object[]{this.helper.getMyComponent(), Integer.valueOf(this.helper.getMyTaskId())});
        this.serializer = SerializeDeSerializeHelper.getSerializer(this.helper.getTopologyContext().getTopologyConfig());
        if (this.helper.getMySpout() != null) {
            this.instance = new SpoutInstance(this.helper, this.streamInCommunicator, this.streamOutCommunicator, this.slaveLooper);
            this.streamInCommunicator.init(this.systemConfig.getInstanceInternalSpoutReadQueueCapacity(), this.systemConfig.getInstanceTuningExpectedSpoutReadQueueSize(), this.systemConfig.getInstanceTuningCurrentSampleWeight());
            this.streamOutCommunicator.init(this.systemConfig.getInstanceInternalSpoutWriteQueueCapacity(), this.systemConfig.getInstanceTuningExpectedSpoutWriteQueueSize(), this.systemConfig.getInstanceTuningCurrentSampleWeight());
        } else {
            this.instance = new BoltInstance(this.helper, this.streamInCommunicator, this.streamOutCommunicator, this.slaveLooper);
            this.streamInCommunicator.init(this.systemConfig.getInstanceInternalBoltReadQueueCapacity(), this.systemConfig.getInstanceTuningExpectedBoltReadQueueSize(), this.systemConfig.getInstanceTuningCurrentSampleWeight());
            this.streamOutCommunicator.init(this.systemConfig.getInstanceInternalBoltWriteQueueCapacity(), this.systemConfig.getInstanceTuningExpectedBoltWriteQueueSize(), this.systemConfig.getInstanceTuningCurrentSampleWeight());
        }
        if (!this.helper.isTopologyRunning()) {
            LOG.info("Instance is deployed in deactivated state");
        }
        startInstanceIfNeeded();
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setName(ThreadNames.THREAD_SLAVE_NAME);
        this.slaveLooper.loop();
    }

    private void startInstanceIfNeeded() {
        if (this.helper == null) {
            LOG.info("No physical plan received. Instance is not started");
            return;
        }
        Map<String, Object> topologyConfig = this.helper.getTopologyContext().getTopologyConfig();
        if (topologyConfig.containsKey("topology.environment")) {
            Map map = (Map) topologyConfig.get("topology.environment");
            LOG.info("Setting topology environment: " + map);
            System.getProperties().putAll(map);
        }
        if (!this.helper.isTopologyStateful()) {
            this.instance.init(null);
            if (TopologyAPI.TopologyState.PAUSED.equals(this.helper.getTopologyState())) {
                this.instance.deactivate();
            }
            this.instance.start();
            this.isInstanceStarted = true;
            LOG.info("Instance is started for non-stateful topology");
            return;
        }
        if (!this.isStatefulProcessingStarted) {
            LOG.info("Start signal not received. Instance is not started");
            return;
        }
        this.instance.init(this.instanceState);
        if (TopologyAPI.TopologyState.PAUSED.equals(this.helper.getTopologyState())) {
            this.instance.deactivate();
        }
        this.instance.start();
        this.isInstanceStarted = true;
        LOG.info("Instance is started for stateful topology");
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LOG.info("Closing the Slave Thread");
        this.metricsCollector.forceGatherAllMetrics();
        LOG.info("Shutting down the instance");
        if (this.instance != null) {
            this.instance.shutdown();
        }
        this.slaveLooper.exitLoop();
        this.streamInCommunicator.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleStartInstanceStatefulProcessing(InstanceControlMsg instanceControlMsg) {
        LOG.info("Starting stateful processing with checkpoint id: " + instanceControlMsg.getStartInstanceStatefulProcessing().getCheckpointId());
        this.isStatefulProcessingStarted = true;
        startInstanceIfNeeded();
    }

    private void cleanAndStopSlave() {
        this.streamInCommunicator.clear();
        this.streamOutCommunicator.clear();
        this.metricsCollector.forceGatherAllMetrics();
        this.slaveLooper.clearTasksOnWakeup();
        this.slaveLooper.clearTimers();
        if (this.instance != null) {
            this.instance.clean();
        }
        this.isStatefulProcessingStarted = false;
    }

    private void registerTasksWithSlave() {
        this.metricsCollector = new MetricsCollector(this.slaveLooper, this.metricsOutCommunicator);
        handleControlMessage();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleRestoreInstanceStateRequest(InstanceControlMsg instanceControlMsg) {
        CheckpointManager.RestoreInstanceStateRequest restoreInstanceStateRequest = instanceControlMsg.getRestoreInstanceStateRequest();
        if (this.isInstanceStarted) {
            cleanAndStopSlave();
        }
        LOG.info("Restoring state to checkpoint id: " + restoreInstanceStateRequest.getState().getCheckpointId());
        if (this.instanceState != null) {
            this.instanceState.clear();
            this.instanceState = null;
        }
        if (restoreInstanceStateRequest.getState().hasState() && !restoreInstanceStateRequest.getState().getState().isEmpty()) {
            this.instanceState = (State) this.serializer.deserialize(restoreInstanceStateRequest.getState().getState().toByteArray());
        } else if (restoreInstanceStateRequest.getState().hasStateLocation()) {
            this.instanceState = (State) this.serializer.deserialize(FileUtils.readFromFile(restoreInstanceStateRequest.getState().getStateLocation()));
        } else {
            LOG.info("The restore request does not have an actual state");
        }
        if (this.instanceState == null) {
            this.instanceState = new HashMapState();
        }
        LOG.info("Instance state restored for checkpoint id: " + restoreInstanceStateRequest.getState().getCheckpointId());
        if (this.isInstanceStarted && this.helper != null) {
            LOG.info("Restarting instance");
            resetCurrentAssignment();
        }
        registerTasksWithSlave();
        this.streamOutCommunicator.offer(CheckpointManager.RestoreInstanceStateResponse.newBuilder().setCheckpointId(restoreInstanceStateRequest.getState().getCheckpointId()).setStatus(Common.Status.newBuilder().setStatus(Common.StatusCode.OK).build()).build());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleNewPhysicalPlan(InstanceControlMsg instanceControlMsg) {
        PhysicalPlanHelper newPhysicalPlanHelper = instanceControlMsg.getNewPhysicalPlanHelper();
        newPhysicalPlanHelper.setTopologyContext(this.metricsCollector);
        if (this.helper == null) {
            this.helper = newPhysicalPlanHelper;
            handleNewAssignment();
            return;
        }
        TopologyAPI.TopologyState topologyState = this.helper.getTopologyState();
        this.helper = newPhysicalPlanHelper;
        this.instance.update(this.helper);
        if (topologyState.equals(this.helper.getTopologyState())) {
            LOG.info("Topology state remains the same in Slave: " + topologyState);
            return;
        }
        switch (this.helper.getTopologyState()) {
            case RUNNING:
                if (!this.isInstanceStarted) {
                    startInstanceIfNeeded();
                }
                this.instance.activate();
                return;
            case PAUSED:
                this.instance.deactivate();
                return;
            default:
                throw new RuntimeException("Unexpected TopologyState is updated for spout: " + this.helper.getTopologyState());
        }
    }
}
