/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service.paxos.cleanup;

import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import net.nmoncho.shaded.com.google.common.base.Preconditions;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallbackWithFailure;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupException;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupRequest;
import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.AsyncFuture;

public class PaxosCleanupSession
extends AsyncFuture<Void>
implements Runnable,
IEndpointStateChangeSubscriber,
IFailureDetectionEventListener,
RequestCallbackWithFailure<Void> {
    private static final Map<UUID, PaxosCleanupSession> sessions = new ConcurrentHashMap<UUID, PaxosCleanupSession>();
    static final long TIMEOUT_NANOS;
    private final UUID session = UUID.randomUUID();
    private final TableId tableId;
    private final Collection<Range<Token>> ranges;
    private final Queue<InetAddressAndPort> pendingCleanups = new ConcurrentLinkedQueue<InetAddressAndPort>();
    private InetAddressAndPort inProgress = null;
    private volatile long lastMessageSentNanos = Clock.Global.nanoTime();
    private ScheduledFuture<?> timeout;

    PaxosCleanupSession(Collection<InetAddressAndPort> endpoints, TableId tableId, Collection<Range<Token>> ranges) {
        this.tableId = tableId;
        this.ranges = ranges;
        this.pendingCleanups.addAll(endpoints);
    }

    private static void setSession(PaxosCleanupSession session) {
        Preconditions.checkState(!sessions.containsKey(session.session));
        sessions.put(session.session, session);
    }

    private static void removeSession(PaxosCleanupSession session) {
        Preconditions.checkState(sessions.containsKey(session.session));
        sessions.remove(session.session);
    }

    @Override
    public void run() {
        PaxosCleanupSession.setSession(this);
        this.startNextOrFinish();
        if (!this.isDone()) {
            this.timeout = TimeoutTask.schedule(this);
        }
    }

    private void startCleanup(InetAddressAndPort endpoint) {
        this.lastMessageSentNanos = Clock.Global.nanoTime();
        PaxosCleanupRequest completer = new PaxosCleanupRequest(this.session, this.tableId, this.ranges);
        Message<PaxosCleanupRequest> msg = Message.out(Verb.PAXOS2_CLEANUP_REQ, completer);
        MessagingService.instance().sendWithCallback(msg, endpoint, this);
    }

    private synchronized void startNextOrFinish() {
        InetAddressAndPort endpoint = this.pendingCleanups.poll();
        if (endpoint == null) {
            Preconditions.checkState(this.inProgress == null, "Unable to complete paxos cleanup session %s, still waiting on %s", (Object)this.session, (Object)this.inProgress);
        } else {
            Preconditions.checkState(this.inProgress == null, "Unable to start paxos cleanup on %s for %s, still waiting on response from %s", (Object)endpoint, (Object)this.session, (Object)this.inProgress);
        }
        this.inProgress = endpoint;
        if (endpoint != null) {
            this.startCleanup(endpoint);
        } else {
            PaxosCleanupSession.removeSession(this);
            this.trySuccess(null);
            if (this.timeout != null) {
                this.timeout.cancel(true);
            }
        }
    }

    private synchronized void fail(String message) {
        if (this.isDone()) {
            return;
        }
        PaxosCleanupSession.removeSession(this);
        this.tryFailure(new PaxosCleanupException(message));
        if (this.timeout != null) {
            this.timeout.cancel(true);
        }
    }

    private synchronized void finish(InetAddressAndPort from, PaxosCleanupResponse finished) {
        Preconditions.checkArgument(from.equals(this.inProgress), "Received unexpected cleanup complete response from %s for session %s. Expected %s", (Object)from, (Object)this.session, (Object)this.inProgress);
        this.inProgress = null;
        if (finished.wasSuccessful) {
            this.startNextOrFinish();
        } else {
            this.fail(String.format("Paxos cleanup session %s failed on %s with message: %s", this.session, from, finished.message));
        }
    }

    public static void finishSession(InetAddressAndPort from, PaxosCleanupResponse response) {
        PaxosCleanupSession session = sessions.get(response.session);
        if (session != null) {
            session.finish(from, response);
        }
    }

    private synchronized void maybeKillSession(InetAddressAndPort unavailable, String reason) {
        if (!this.pendingCleanups.contains(unavailable)) {
            return;
        }
        this.fail(String.format("Paxos cleanup session %s failed after %s %s", this.session, unavailable, reason));
    }

    @Override
    public void onJoin(InetAddressAndPort endpoint, EndpointState epState) {
    }

    @Override
    public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {
    }

    @Override
    public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) {
    }

    @Override
    public void onAlive(InetAddressAndPort endpoint, EndpointState state) {
    }

    @Override
    public void onDead(InetAddressAndPort endpoint, EndpointState state) {
        this.maybeKillSession(endpoint, "marked dead");
    }

    @Override
    public void onRemove(InetAddressAndPort endpoint) {
        this.maybeKillSession(endpoint, "removed from ring");
    }

    @Override
    public void onRestart(InetAddressAndPort endpoint, EndpointState state) {
        this.maybeKillSession(endpoint, "restarted");
    }

    @Override
    public void convict(InetAddressAndPort ep, double phi) {
        this.maybeKillSession(ep, "convicted by failure detector");
    }

    @Override
    public void onFailure(InetAddressAndPort from, RequestFailureReason reason) {
        this.fail(from.toString() + ' ' + (Object)((Object)reason) + " for cleanup request for paxos cleanup session  " + this.session);
    }

    @Override
    public void onResponse(Message<Void> msg) {
    }

    static {
        long timeoutSeconds = Integer.getInteger("cassandra.paxos_cleanup_session_timeout_seconds", (int)TimeUnit.HOURS.toSeconds(2L)).intValue();
        TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(timeoutSeconds);
    }

    private static class TimeoutTask
    implements Runnable {
        private final WeakReference<PaxosCleanupSession> ref;

        TimeoutTask(PaxosCleanupSession session) {
            this.ref = new WeakReference<PaxosCleanupSession>(session);
        }

        @Override
        public void run() {
            PaxosCleanupSession session = (PaxosCleanupSession)this.ref.get();
            if (session == null || session.isDone()) {
                return;
            }
            long remaining = session.lastMessageSentNanos + TIMEOUT_NANOS - Clock.Global.nanoTime();
            if (remaining > 0L) {
                this.schedule(remaining);
            } else {
                session.fail(String.format("Paxos cleanup session %s timed out", session.session));
            }
        }

        ScheduledFuture<?> schedule(long delayNanos) {
            return ScheduledExecutors.scheduledTasks.scheduleTimeoutWithDelay(this, delayNanos, TimeUnit.NANOSECONDS);
        }

        private static ScheduledFuture<?> schedule(PaxosCleanupSession session) {
            return new TimeoutTask(session).schedule(TIMEOUT_NANOS);
        }
    }
}

