/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.collector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.geaflow.cluster.collector.CloseEmitterRequest;
import org.apache.geaflow.cluster.collector.IOutputMessageBuffer;
import org.apache.geaflow.cluster.collector.InitEmitterRequest;
import org.apache.geaflow.cluster.collector.StashEmitterRequest;
import org.apache.geaflow.cluster.collector.UpdateEmitterRequest;
import org.apache.geaflow.cluster.exception.ComponentUncaughtExceptionHandler;
import org.apache.geaflow.cluster.protocol.OutputMessage;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.encoder.IEncoder;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.common.metric.EventMetrics;
import org.apache.geaflow.common.metric.ShuffleWriteMetrics;
import org.apache.geaflow.common.task.TaskArgs;
import org.apache.geaflow.common.thread.Executors;
import org.apache.geaflow.io.AbstractMessageBuffer;
import org.apache.geaflow.model.record.RecordArgs;
import org.apache.geaflow.shuffle.ForwardOutputDesc;
import org.apache.geaflow.shuffle.OutputDescriptor;
import org.apache.geaflow.shuffle.api.writer.IShuffleWriter;
import org.apache.geaflow.shuffle.api.writer.IWriterContext;
import org.apache.geaflow.shuffle.api.writer.WriterContext;
import org.apache.geaflow.shuffle.config.ShuffleConfig;
import org.apache.geaflow.shuffle.desc.IOutputDesc;
import org.apache.geaflow.shuffle.desc.OutputType;
import org.apache.geaflow.shuffle.message.Shard;
import org.apache.geaflow.shuffle.service.ShuffleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelineOutputEmitter {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipelineOutputEmitter.class);
    private static final ExecutorService EMIT_EXECUTOR = Executors.getUnboundedExecutorService((String)PipelineOutputEmitter.class.getSimpleName(), (long)60L, (TimeUnit)TimeUnit.SECONDS, null, (Thread.UncaughtExceptionHandler)ComponentUncaughtExceptionHandler.INSTANCE);
    private static final int DEFAULT_TIMEOUT_MS = 100;
    private final Configuration configuration;
    private final int index;
    private final Map<Integer, InitEmitterRequest> initRequestCache = new HashMap<Integer, InitEmitterRequest>();
    private final Map<Integer, AtomicBoolean[]> runningFlags = new HashMap<Integer, AtomicBoolean[]>();

    public PipelineOutputEmitter(Configuration configuration, int index) {
        this.configuration = configuration;
        this.index = index;
    }

    public void init(InitEmitterRequest request) {
        this.initRequestCache.put(request.getTaskId(), request);
        UpdateEmitterRequest updateEmitterRequest = new UpdateEmitterRequest(request.getTaskId(), request.getWindowId(), request.getPipelineId(), request.getPipelineName(), request.getOutputBuffers());
        this.update(updateEmitterRequest);
    }

    public void update(UpdateEmitterRequest request) {
        int taskId = request.getTaskId();
        if (!this.initRequestCache.containsKey(taskId)) {
            throw new GeaflowRuntimeException("init emitter request not found for task " + taskId);
        }
        InitEmitterRequest initEmitterRequest = this.initRequestCache.get(taskId);
        OutputDescriptor outputDescriptor = initEmitterRequest.getOutputDescriptor();
        List<IOutputMessageBuffer<?, Shard>> outputBuffers = request.getOutputBuffers();
        List outputDescList = outputDescriptor.getOutputDescList();
        int outputNum = outputDescList.size();
        AtomicBoolean[] flags = new AtomicBoolean[outputNum];
        ShuffleConfig shuffleConfig = ShuffleManager.getInstance().getShuffleConfig();
        for (int i = 0; i < outputNum; ++i) {
            AtomicBoolean flag;
            IOutputDesc outputDesc = (IOutputDesc)outputDescList.get(i);
            if (outputDesc.getType() == OutputType.RESPONSE) continue;
            ForwardOutputDesc forwardOutputDesc = (ForwardOutputDesc)outputDesc;
            IShuffleWriter pipeRecordWriter = ShuffleManager.getInstance().loadShuffleWriter();
            IEncoder encoder = forwardOutputDesc.getEncoder();
            if (encoder != null) {
                encoder.init(initEmitterRequest.getConfiguration());
            }
            TaskArgs taskArgs = initEmitterRequest.getTaskArgs();
            WriterContext writerContext = WriterContext.newBuilder().setPipelineId(request.getPipelineId()).setPipelineName(request.getPipelineName()).setConfig(shuffleConfig).setVertexId(forwardOutputDesc.getPartitioner().getOpId()).setEdgeId(forwardOutputDesc.getEdgeId()).setTaskId(taskArgs.getTaskId()).setTaskIndex(taskArgs.getTaskIndex()).setTaskName(taskArgs.getTaskName()).setChannelNum(forwardOutputDesc.getTargetTaskIndices().size()).setEncoder(encoder).setDataExchangeMode(forwardOutputDesc.getDataExchangeMode());
            pipeRecordWriter.init((IWriterContext)writerContext);
            flags[i] = flag = new AtomicBoolean(true);
            String emitterId = String.format("%d[%d/%d]", taskId, taskArgs.getTaskIndex(), taskArgs.getParallelism());
            EmitterTask emitterTask = new EmitterTask(pipeRecordWriter, outputBuffers.get(i), flag, request.getWindowId(), this.index, forwardOutputDesc.getEdgeName(), emitterId);
            EMIT_EXECUTOR.execute(emitterTask);
        }
        this.runningFlags.put(taskId, flags);
    }

    public void close(CloseEmitterRequest request) {
        int taskId = request.getTaskId();
        this.initRequestCache.remove(taskId);
        this.handleRunningFlags(taskId);
    }

    public void stash(StashEmitterRequest request) {
        this.handleRunningFlags(request.getTaskId());
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public void clear() {
        LOGGER.info("clear emitter cache of task {}", this.initRequestCache.keySet());
        this.initRequestCache.clear();
    }

    private void handleRunningFlags(int taskId) {
        if (!this.runningFlags.containsKey(taskId)) {
            return;
        }
        for (AtomicBoolean flag : this.runningFlags.remove(taskId)) {
            if (flag == null) continue;
            flag.set(false);
        }
    }

    private static class EmitterTask<T>
    implements Runnable {
        private static final String WRITER_NAME_PATTERN = "shuffle-writer-%d-%s";
        private final IShuffleWriter<T, Shard> writer;
        private final IOutputMessageBuffer<T, Shard> pipe;
        private final AtomicBoolean running;
        private final long windowId;
        private final String name;
        private final String emitterId;
        private final boolean isMessage;

        public EmitterTask(IShuffleWriter<T, Shard> writer, IOutputMessageBuffer<T, Shard> pipe, AtomicBoolean running, long windowId, int workerIndex, String edgeName, String emitterId) {
            this.writer = writer;
            this.pipe = pipe;
            this.running = running;
            this.windowId = windowId;
            this.name = String.format(WRITER_NAME_PATTERN, workerIndex, edgeName);
            this.emitterId = emitterId;
            this.isMessage = edgeName.equals(RecordArgs.GraphRecordNames.Message.name());
        }

        @Override
        public void run() {
            Thread.currentThread().setName(this.name);
            try {
                this.execute();
            }
            catch (Throwable t) {
                this.pipe.error(t);
                LOGGER.error("emitter task err in window id {} {}", new Object[]{this.windowId, this.emitterId, t});
                throw new GeaflowRuntimeException(t);
            }
            LOGGER.info("emitter task finish window id {} {}", (Object)this.windowId, (Object)this.emitterId);
        }

        private void execute() throws Exception {
            while (this.running.get()) {
                OutputMessage record = (OutputMessage)this.pipe.poll(100L, TimeUnit.MILLISECONDS);
                if (record == null) continue;
                long windowId = record.getWindowId();
                if (record.isBarrier()) {
                    Optional result = this.writer.flush(windowId);
                    this.handleMetrics();
                    this.pipe.setResult(windowId, result.orElse(null));
                    continue;
                }
                this.writer.emit(windowId, (List)record.getMessage(), false, record.getTargetChannel());
            }
            this.writer.close();
        }

        private void handleMetrics() {
            ShuffleWriteMetrics shuffleWriteMetrics = this.writer.getShuffleWriteMetrics();
            EventMetrics eventMetrics = ((AbstractMessageBuffer)this.pipe).getEventMetrics();
            if (this.isMessage) {
                eventMetrics.setShuffleWriteRecords(shuffleWriteMetrics.getWrittenRecords());
                eventMetrics.setShuffleWriteBytes(shuffleWriteMetrics.getEncodedSize());
            } else {
                eventMetrics.addShuffleWriteRecords(shuffleWriteMetrics.getWrittenRecords());
                eventMetrics.addShuffleWriteBytes(shuffleWriteMetrics.getEncodedSize());
            }
        }
    }
}

