/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamCoordinator;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamException;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StreamResultFuture
extends AsyncFuture<StreamState> {
    private static final Logger logger = LoggerFactory.getLogger(StreamResultFuture.class);
    public final TimeUUID planId;
    public final StreamOperation streamOperation;
    private final StreamCoordinator coordinator;
    private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<StreamEventHandler>();
    private final long slowEventsLogTimeoutNanos = DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toNanoseconds();

    public StreamResultFuture(TimeUUID planId, StreamOperation streamOperation, StreamCoordinator coordinator) {
        this.planId = planId;
        this.streamOperation = streamOperation;
        this.coordinator = coordinator;
        if (!coordinator.isFollower() && !coordinator.hasActiveSessions()) {
            this.trySuccess(this.getCurrentState());
        }
    }

    @VisibleForTesting
    public StreamResultFuture(TimeUUID planId, StreamOperation streamOperation, TimeUUID pendingRepair, PreviewKind previewKind) {
        this(planId, streamOperation, new StreamCoordinator(streamOperation, 0, StreamingChannel.Factory.Global.streamingFactory(), true, false, pendingRepair, previewKind));
    }

    public static StreamResultFuture createInitiator(TimeUUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator) {
        StreamResultFuture future = StreamResultFuture.createAndRegisterInitiator(planId, streamOperation, coordinator);
        if (listeners != null) {
            for (StreamEventHandler listener : listeners) {
                future.addEventListener(listener);
            }
        }
        logger.info("[Stream #{}] Executing streaming plan for {}", (Object)planId, (Object)streamOperation.getDescription());
        for (StreamSession session : coordinator.getAllStreamSessions()) {
            session.init(future);
        }
        coordinator.connect(future);
        return future;
    }

    public static synchronized StreamResultFuture createFollower(int sessionIndex, TimeUUID planId, StreamOperation streamOperation, InetAddressAndPort from, StreamingChannel channel, int messagingVersion, TimeUUID pendingRepair, PreviewKind previewKind) {
        StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
        if (future == null) {
            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} from {} {}", new Object[]{planId, sessionIndex, streamOperation.getDescription(), from, channel.description()});
            future = new StreamResultFuture(planId, streamOperation, pendingRepair, previewKind);
            StreamManager.instance.registerFollower(future);
        }
        future.initInbound(from, channel, messagingVersion, sessionIndex);
        logger.info("[Stream #{}, ID#{}] Received streaming plan for {} from {} {}", new Object[]{planId, sessionIndex, streamOperation.getDescription(), from, channel.description()});
        return future;
    }

    private static StreamResultFuture createAndRegisterInitiator(TimeUUID planId, StreamOperation streamOperation, StreamCoordinator coordinator) {
        StreamResultFuture future = new StreamResultFuture(planId, streamOperation, coordinator);
        StreamManager.instance.registerInitiator(future);
        return future;
    }

    public StreamCoordinator getCoordinator() {
        return this.coordinator;
    }

    private void initInbound(InetAddressAndPort from, StreamingChannel channel, int messagingVersion, int sessionIndex) {
        StreamSession session = this.coordinator.getOrCreateInboundSession(from, channel, messagingVersion, sessionIndex);
        session.init(this);
    }

    public void addEventListener(StreamEventHandler listener) {
        this.addCallback((FutureCallback)listener);
        this.eventListeners.add(listener);
    }

    public StreamState getCurrentState() {
        return new StreamState(this.planId, this.streamOperation, this.coordinator.getAllSessionInfo());
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        StreamResultFuture that = (StreamResultFuture)o;
        return this.planId.equals(that.planId);
    }

    public int hashCode() {
        return this.planId.hashCode();
    }

    void handleSessionPrepared(StreamSession session, StreamSession.PrepareDirection prepareDirection) {
        SessionInfo sessionInfo = session.getSessionInfo();
        logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({}), sending {} files({})", new Object[]{session.planId(), session.sessionIndex(), sessionInfo.getTotalFilesToReceive(), FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToReceive()), sessionInfo.getTotalFilesToSend(), FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToSend())});
        StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(this.planId, sessionInfo, prepareDirection);
        this.coordinator.addSessionInfo(sessionInfo);
        this.fireStreamEvent(event);
    }

    void handleSessionComplete(StreamSession session) {
        logger.info("[Stream #{}] Session with {} is {}", new Object[]{session.planId(), session.peer, session.state().name().toLowerCase()});
        this.fireStreamEvent(new StreamEvent.SessionCompleteEvent(session));
        SessionInfo sessionInfo = session.getSessionInfo();
        this.coordinator.addSessionInfo(sessionInfo);
        this.maybeComplete();
    }

    public void handleProgress(ProgressInfo progress) {
        this.coordinator.updateProgress(progress);
        this.fireStreamEvent(new StreamEvent.ProgressEvent(this.planId, progress));
    }

    synchronized void fireStreamEvent(StreamEvent event) {
        long startNanos = Clock.Global.nanoTime();
        for (StreamEventHandler listener : this.eventListeners) {
            try {
                listener.handleStreamEvent(event);
            }
            catch (Throwable t) {
                logger.warn("Unexpected exception in listern while calling handleStreamEvent", t);
            }
        }
        long totalNanos = Clock.Global.nanoTime() - startNanos;
        if (totalNanos > this.slowEventsLogTimeoutNanos) {
            NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1L, TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}", () -> new Object[]{Duration.ofNanos(this.slowEventsLogTimeoutNanos), Duration.ofNanos(totalNanos)});
        }
    }

    private synchronized void maybeComplete() {
        if (this.finishedAllSessions()) {
            StreamState finalState = this.getCurrentState();
            if (finalState.hasFailedSession()) {
                logger.warn("[Stream #{}] Stream failed", (Object)this.planId);
                this.tryFailure(new StreamException(finalState, "Stream failed"));
            } else if (finalState.hasAbortedSession()) {
                logger.info("[Stream #{}] Stream aborted", (Object)this.planId);
                this.trySuccess(finalState);
            } else {
                logger.info("[Stream #{}] All sessions completed", (Object)this.planId);
                this.trySuccess(finalState);
            }
        }
    }

    public StreamSession getSession(InetAddressAndPort peer, int sessionIndex) {
        return this.coordinator.getSessionById(peer, sessionIndex);
    }

    private boolean finishedAllSessions() {
        return this.coordinator.getAllSessionInfo().stream().allMatch(s -> s.state.isFinalState());
    }
}

