package org.apache.nemo.runtime.executor;

import com.google.protobuf.ByteString;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.nemo.common.exception.UnknownFailureCauseException;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import org.apache.reef.annotations.audience.EvaluatorSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@EvaluatorSide
/* loaded from: input_file:org/apache/nemo/runtime/executor/MetricManagerWorker.class */
public final class MetricManagerWorker implements MetricMessageSender {
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private final BlockingQueue<ControlMessage.Metric> metricMessageQueue = new LinkedBlockingQueue();
    private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
    private static final int FLUSHING_PERIOD = 3000;
    private static final Logger LOG = LoggerFactory.getLogger(MetricManagerWorker.class.getName());

    @Inject
    private MetricManagerWorker(PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
        this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            flushMetricMessageQueueToMaster();
        }, 0L, 3000L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.nemo.runtime.executor.MetricMessageSender
    public void flush() {
        flushMetricMessageQueueToMaster();
        this.persistentConnectionToMasterMap.getMessageSender("RUNTIME_MASTER_MESSAGE_LISTENER_ID").send(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("RUNTIME_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.MetricFlushed).build());
    }

    private synchronized void flushMetricMessageQueueToMaster() {
        if (this.metricMessageQueue.isEmpty()) {
            return;
        }
        int size = this.metricMessageQueue.size();
        ControlMessage.MetricMsg.Builder newBuilder = ControlMessage.MetricMsg.newBuilder();
        LOG.debug("MetricManagerWorker Size: {}", Integer.valueOf(size));
        for (int i = 0; i < size; i++) {
            ControlMessage.Metric poll = this.metricMessageQueue.poll();
            LOG.debug("MetricManagerWorker addMetric: {}, {}, {}", new Object[]{Integer.valueOf(size), Integer.valueOf(i), poll});
            newBuilder.addMetric(i, poll);
        }
        this.persistentConnectionToMasterMap.getMessageSender("RUNTIME_MASTER_MESSAGE_LISTENER_ID").send(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("RUNTIME_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.MetricMessageReceived).setMetricMsg(newBuilder.build()).build());
    }

    @Override // org.apache.nemo.runtime.executor.MetricMessageSender
    public void send(String str, String str2, String str3, byte[] bArr) {
        this.metricMessageQueue.add(ControlMessage.Metric.newBuilder().setMetricType(str).setMetricId(str2).setMetricField(str3).setMetricValue(ByteString.copyFrom(bArr)).build());
    }

    @Override // org.apache.nemo.runtime.executor.MetricMessageSender, java.lang.AutoCloseable
    public void close() throws UnknownFailureCauseException {
        this.scheduledExecutorService.shutdownNow();
        flushMetricMessageQueueToMaster();
    }
}
