/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.container;

import java.util.ArrayList;
import java.util.List;
import org.apache.geaflow.cluster.common.IReliableContext;
import org.apache.geaflow.cluster.common.ReliableContainerContext;
import org.apache.geaflow.cluster.constants.ClusterConstants;
import org.apache.geaflow.cluster.protocol.EventType;
import org.apache.geaflow.cluster.protocol.IComposeEvent;
import org.apache.geaflow.cluster.protocol.IEvent;
import org.apache.geaflow.cluster.protocol.IHighAvailableEvent;
import org.apache.geaflow.cluster.system.ClusterMetaStore;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.ha.runtime.HighAvailableLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContainerContext
extends ReliableContainerContext {
    private static final Logger LOGGER = LoggerFactory.getLogger(ContainerContext.class);
    private List<IEvent> reliableEvents = new ArrayList<IEvent>();
    private transient List<IEvent> waitingCheckpointEvents = new ArrayList<IEvent>();

    public ContainerContext(int id, Configuration config) {
        super(id, ClusterConstants.getContainerName(id), config);
    }

    public ContainerContext(int id, Configuration config, boolean isRecover) {
        this(id, config);
        this.isRecover = isRecover;
    }

    public ContainerContext(int id, Configuration config, boolean isRecover, List<IEvent> reliableEvents) {
        this(id, config, isRecover);
        this.reliableEvents = reliableEvents;
    }

    @Override
    public void load() {
        List<IEvent> events = ClusterMetaStore.getInstance(this.id, this.name, this.config).getEvents();
        if (events != null) {
            LOGGER.info("container {} recover events {}", (Object)this.id, events);
            this.reliableEvents = events;
        }
        if (this.waitingCheckpointEvents == null) {
            this.waitingCheckpointEvents = new ArrayList<IEvent>();
        } else {
            this.waitingCheckpointEvents.clear();
        }
    }

    public List<IEvent> getReliableEvents() {
        return this.reliableEvents;
    }

    public synchronized void addEvent(IEvent input) {
        if (input instanceof IHighAvailableEvent) {
            if (((IHighAvailableEvent)((Object)input)).getHaLevel() == HighAvailableLevel.CHECKPOINT) {
                if (this.waitingCheckpointEvents == null) {
                    this.waitingCheckpointEvents = new ArrayList<IEvent>();
                }
                if (!this.waitingCheckpointEvents.contains(input)) {
                    this.waitingCheckpointEvents.add(input);
                    LOGGER.info("container {} add recoverable event {}", (Object)this.id, (Object)input);
                } else {
                    LOGGER.info("container {} already has recoverable event {}", (Object)this.id, (Object)input);
                }
            }
        } else if (input.getEventType() == EventType.COMPOSE) {
            IComposeEvent composeEvent = (IComposeEvent)input;
            for (IEvent event : composeEvent.getEventList()) {
                this.addEvent(event);
            }
        }
    }

    public static class EventCheckpointFunction
    implements IReliableContext.IReliableContextCheckpointFunction {
        @Override
        public void doCheckpoint(IReliableContext context) {
            ContainerContext containerContext = (ContainerContext)context;
            if (containerContext.waitingCheckpointEvents == null || containerContext.waitingCheckpointEvents.isEmpty()) {
                LOGGER.info("container {} has no new events to checkpoint", (Object)containerContext.getId());
                return;
            }
            List<IEvent> reliableEvents = ClusterMetaStore.getInstance().getEvents();
            if (reliableEvents == null) {
                reliableEvents = new ArrayList<IEvent>(containerContext.waitingCheckpointEvents);
            } else {
                for (IEvent event : containerContext.waitingCheckpointEvents) {
                    if (reliableEvents.contains(event)) {
                        LOGGER.info("container {} already has saved recoverable event {}", (Object)containerContext.id, (Object)event);
                        continue;
                    }
                    reliableEvents.add(event);
                }
            }
            ClusterMetaStore.getInstance().saveEvent(reliableEvents).flush();
            LOGGER.info("container {} checkpoint events {}", (Object)containerContext.getId(), reliableEvents);
            containerContext.waitingCheckpointEvents.clear();
        }
    }
}

