/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.fetcher;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.geaflow.cluster.exception.ComponentUncaughtExceptionHandler;
import org.apache.geaflow.cluster.fetcher.BarrierHandler;
import org.apache.geaflow.cluster.fetcher.CloseFetchRequest;
import org.apache.geaflow.cluster.fetcher.FetchRequest;
import org.apache.geaflow.cluster.fetcher.IInputMessageBuffer;
import org.apache.geaflow.cluster.fetcher.InitFetchRequest;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.encoder.IEncoder;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.common.metric.EventMetrics;
import org.apache.geaflow.common.metric.ShuffleReadMetrics;
import org.apache.geaflow.common.thread.Executors;
import org.apache.geaflow.io.AbstractMessageBuffer;
import org.apache.geaflow.shuffle.api.reader.IReaderContext;
import org.apache.geaflow.shuffle.api.reader.PipelineReader;
import org.apache.geaflow.shuffle.api.reader.ReaderContext;
import org.apache.geaflow.shuffle.desc.ShardInputDesc;
import org.apache.geaflow.shuffle.message.PipelineBarrier;
import org.apache.geaflow.shuffle.message.PipelineEvent;
import org.apache.geaflow.shuffle.message.PipelineMessage;
import org.apache.geaflow.shuffle.service.ShuffleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipelineInputFetcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipelineInputFetcher.class);
    private static final ExecutorService FETCH_EXECUTOR = Executors.getUnboundedExecutorService((String)PipelineInputFetcher.class.getSimpleName(), (long)60L, (TimeUnit)TimeUnit.SECONDS, null, (Thread.UncaughtExceptionHandler)ComponentUncaughtExceptionHandler.INSTANCE);
    private final Map<Integer, FetcherTask> taskId2fetchTask = new HashMap<Integer, FetcherTask>();
    private final Configuration config;

    public PipelineInputFetcher(Configuration config) {
        this.config = config;
    }

    public void init(InitFetchRequest request) {
        int taskId = request.getTaskId();
        if (this.taskId2fetchTask.containsKey(taskId)) {
            throw new GeaflowRuntimeException("task already exists: " + taskId);
        }
        for (ShardInputDesc inputDesc : request.getInputShards().values()) {
            IEncoder encoder = inputDesc.getEncoder();
            if (encoder == null) continue;
            encoder.init(this.config);
        }
        this.taskId2fetchTask.put(taskId, new FetcherTask(this.config, request));
        LOGGER.info("init fetcher task {} {}", (Object)request.getTaskName(), request.getShufflePhases());
    }

    protected void fetch(FetchRequest request) {
        FetcherTask fetcherTask = this.taskId2fetchTask.get(request.getTaskId());
        if (fetcherTask != null) {
            long targetWindowId = request.getWindowId() + request.getWindowCount() - 1L;
            fetcherTask.updateWindowId(targetWindowId);
            if (!fetcherTask.isRunning()) {
                fetcherTask.start();
                FETCH_EXECUTOR.execute(fetcherTask);
            }
        }
    }

    public void close(CloseFetchRequest request) {
        int taskId = request.getTaskId();
        FetcherTask task = this.taskId2fetchTask.remove(taskId);
        if (task != null) {
            task.close();
            LOGGER.info("close fetcher task {} {}", (Object)task.initFetchRequest.getTaskName(), task.initFetchRequest.getShufflePhases());
        }
    }

    public void cancel() {
    }

    public void close() {
        for (FetcherTask task : this.taskId2fetchTask.values()) {
            task.close();
        }
        this.taskId2fetchTask.clear();
    }

    private static class FetcherTask
    implements Runnable {
        private static final String READER_NAME_PATTERN = "shuffle-reader-%d[%d/%d]";
        private static final int WAIT_TIME_OUT_MS = 100;
        private final Configuration config;
        private final InitFetchRequest initFetchRequest;
        private final PipelineReader shuffleReader;
        private final IInputMessageBuffer<?>[] fetchListeners;
        private final BarrierHandler barrierHandler;
        private final String name;
        private volatile boolean running;
        private volatile long targetWindowId;

        private FetcherTask(Configuration config, InitFetchRequest request) {
            this.config = config;
            this.initFetchRequest = request;
            this.shuffleReader = (PipelineReader)ShuffleManager.getInstance().loadShuffleReader();
            this.shuffleReader.init((IReaderContext)this.buildReaderContext());
            this.fetchListeners = request.getFetchListeners().toArray(new IInputMessageBuffer[0]);
            this.barrierHandler = new BarrierHandler(request.getTaskId(), request.getInputShards());
            this.name = String.format(READER_NAME_PATTERN, request.getTaskId(), request.getTaskIndex(), request.getTaskParallelism());
        }

        public void start() {
            this.running = true;
        }

        public boolean isRunning() {
            return this.running;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void updateWindowId(long windowId) {
            if (this.targetWindowId < windowId) {
                this.targetWindowId = windowId;
                FetcherTask fetcherTask = this;
                synchronized (fetcherTask) {
                    this.notifyAll();
                }
            }
        }

        @Override
        public void run() {
            Thread.currentThread().setName(this.name);
            try {
                this.fetch();
            }
            catch (GeaflowRuntimeException e) {
                LOGGER.error("fetcher task err with window id {} {}", new Object[]{this.targetWindowId, this.name, e});
                throw e;
            }
            catch (Throwable e) {
                LOGGER.error("fetcher task err with window id {} {}", new Object[]{this.targetWindowId, this.name, e});
                throw new GeaflowRuntimeException(e.getMessage(), e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void fetch() throws InterruptedException {
            while (this.running) {
                this.shuffleReader.fetch(this.targetWindowId);
                if (!this.shuffleReader.hasNext()) {
                    FetcherTask fetcherTask = this;
                    synchronized (fetcherTask) {
                        this.wait(100L);
                        continue;
                    }
                }
                PipelineEvent event = this.shuffleReader.next();
                if (event == null) continue;
                if (event instanceof PipelineMessage) {
                    PipelineMessage message = (PipelineMessage)event;
                    for (IInputMessageBuffer<?> listener : this.fetchListeners) {
                        listener.onMessage(message);
                    }
                    continue;
                }
                PipelineBarrier barrier = (PipelineBarrier)event;
                if (!this.barrierHandler.checkCompleted(barrier)) continue;
                long windowCount = this.barrierHandler.getTotalWindowCount();
                this.handleMetrics();
                PipelineBarrier windowBarrier = new PipelineBarrier(barrier.getWindowId(), barrier.getEdgeId(), windowCount);
                for (IInputMessageBuffer<?> listener : this.fetchListeners) {
                    listener.onBarrier(windowBarrier);
                }
            }
            LOGGER.info("fetcher task finish window id {} {}", (Object)this.targetWindowId, (Object)this.name);
        }

        private ReaderContext buildReaderContext() {
            ReaderContext context = new ReaderContext();
            context.setConfig(this.config);
            context.setVertexId(this.initFetchRequest.getVertexId());
            context.setTaskName(this.initFetchRequest.getTaskName());
            context.setInputShardMap(this.initFetchRequest.getInputShards());
            context.setInputSlices(this.initFetchRequest.getInputSlices());
            context.setSliceNum(this.initFetchRequest.getSliceNum());
            return context;
        }

        private void handleMetrics() {
            ShuffleReadMetrics shuffleReadMetrics = this.shuffleReader.getShuffleReadMetrics();
            for (IInputMessageBuffer<?> listener : this.fetchListeners) {
                if (!(listener instanceof AbstractMessageBuffer)) continue;
                EventMetrics eventMetrics = ((AbstractMessageBuffer)listener).getEventMetrics();
                eventMetrics.addShuffleReadBytes(shuffleReadMetrics.getDecodeBytes());
            }
        }

        public void close() {
            this.running = false;
            if (this.shuffleReader != null) {
                this.shuffleReader.close();
            }
        }
    }
}

