/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.container;

import com.baidu.brpc.server.RpcServerOptions;
import com.google.common.base.Preconditions;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.geaflow.cluster.collector.EmitterService;
import org.apache.geaflow.cluster.common.AbstractContainer;
import org.apache.geaflow.cluster.constants.ClusterConstants;
import org.apache.geaflow.cluster.container.ContainerContext;
import org.apache.geaflow.cluster.container.ContainerInfo;
import org.apache.geaflow.cluster.container.IContainer;
import org.apache.geaflow.cluster.fetcher.FetcherService;
import org.apache.geaflow.cluster.protocol.ICommand;
import org.apache.geaflow.cluster.protocol.IEvent;
import org.apache.geaflow.cluster.protocol.OpenContainerEvent;
import org.apache.geaflow.cluster.protocol.OpenContainerResponseEvent;
import org.apache.geaflow.cluster.rpc.impl.ContainerEndpoint;
import org.apache.geaflow.cluster.rpc.impl.RpcServiceImpl;
import org.apache.geaflow.cluster.task.service.TaskService;
import org.apache.geaflow.cluster.worker.Dispatcher;
import org.apache.geaflow.cluster.worker.DispatcherService;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.common.rpc.ConfigurableServerOption;
import org.apache.geaflow.common.utils.PortUtil;
import org.apache.geaflow.shuffle.service.ShuffleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Container
extends AbstractContainer
implements IContainer<IEvent, IEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(Container.class);
    private ContainerContext containerContext;
    private Dispatcher dispatcher;
    private AtomicBoolean isOpened = new AtomicBoolean(false);
    protected FetcherService fetcherService;
    protected EmitterService emitterService;
    protected TaskService workerService;
    protected DispatcherService dispatcherService;

    public Container() {
        this(0);
    }

    public Container(int rpcPort) {
        super(rpcPort);
    }

    @Override
    public void init(ContainerContext containerContext) {
        try {
            this.containerContext = containerContext;
            String containerName = ClusterConstants.getContainerName(containerContext.getId());
            super.init(containerContext.getId(), containerName, containerContext.getConfig());
            this.registerToMaster();
            LOGGER.info("container {} init finish", (Object)this.name);
        }
        catch (Throwable t) {
            LOGGER.error("init container err", t);
            throw new GeaflowRuntimeException(t);
        }
    }

    @Override
    protected void startRpcService() {
        RpcServerOptions serverOptions = ConfigurableServerOption.build((Configuration)this.configuration);
        this.rpcService = new RpcServiceImpl(PortUtil.getPort((int)this.rpcPort), serverOptions);
        this.rpcService.addEndpoint(new ContainerEndpoint(this));
        this.rpcPort = this.rpcService.startService();
    }

    @Override
    public OpenContainerResponseEvent open(OpenContainerEvent event) {
        try {
            if (this.isOpened.compareAndSet(false, true)) {
                int num = event.getExecutorNum();
                Preconditions.checkArgument((num > 0 ? 1 : 0) != 0, (Object)"worker num should > 0");
                LOGGER.info("open container {} with {} executors", (Object)this.name, (Object)num);
                this.fetcherService = new FetcherService(num, this.configuration);
                this.emitterService = new EmitterService(num, this.configuration);
                this.workerService = new TaskService(this.id, num, this.configuration, this.metricGroup, this.fetcherService, this.emitterService);
                this.dispatcher = new Dispatcher(this.workerService);
                this.dispatcherService = new DispatcherService(this.dispatcher, this.configuration);
                this.fetcherService.start();
                this.emitterService.start();
                this.workerService.start();
                this.dispatcherService.start();
                if (this.containerContext.getReliableEvents() != null) {
                    for (IEvent reliableEvent : this.containerContext.getReliableEvents()) {
                        LOGGER.info("{} replay event {}", (Object)this.name, (Object)reliableEvent);
                        this.dispatcher.add((ICommand)reliableEvent);
                    }
                }
                this.registerHAService();
            }
            return new OpenContainerResponseEvent(this.id, 0);
        }
        catch (Throwable throwable) {
            LOGGER.error("{} open error", (Object)this.name, (Object)throwable);
            throw throwable;
        }
    }

    @Override
    public IEvent process(IEvent input) {
        LOGGER.info("{} process event {}", (Object)this.name, (Object)input);
        try {
            this.containerContext.addEvent(input);
            this.containerContext.checkpoint(new ContainerContext.EventCheckpointFunction());
            this.dispatcher.add((ICommand)input);
            return null;
        }
        catch (Throwable throwable) {
            LOGGER.error("{} process error", (Object)this.name, (Object)throwable);
            throw throwable;
        }
    }

    @Override
    public void close() {
        super.close();
        if (this.fetcherService != null) {
            this.fetcherService.shutdown();
        }
        if (this.workerService != null) {
            this.workerService.shutdown();
        }
        if (this.dispatcherService != null) {
            this.dispatcherService.shutdown();
        }
        if (this.emitterService != null) {
            this.emitterService.shutdown();
        }
        LOGGER.info("container {} closed", (Object)this.name);
    }

    @Override
    protected ContainerInfo buildComponentInfo() {
        ContainerInfo containerInfo = new ContainerInfo();
        this.fillComponentInfo(containerInfo);
        containerInfo.setShufflePort(ShuffleManager.getInstance().getShufflePort());
        return containerInfo;
    }
}

