package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.CommonContext;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.ControlResponsePoller;
import io.aeron.archive.client.RecordingDescriptorConsumer;
import io.aeron.archive.client.RecordingDescriptorPoller;
import io.aeron.archive.client.ReplayMerge;
import io.aeron.archive.client.ReplayParams;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.RecordingSignal;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.exceptions.AeronException;
import io.aeron.exceptions.TimeoutException;
import java.util.concurrent.TimeUnit;
import org.agrona.CloseHelper;
import org.agrona.Strings;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.CountedErrorHandler;
import uk.co.real_logic.artio.dictionary.SessionConstants;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/aeron/archive/ReplicationSession.class */
public class ReplicationSession implements Session, RecordingDescriptorConsumer {
    private static final int REPLAY_REMOVE_THRESHOLD = 0;
    private static final int RETRY_ATTEMPTS = 3;
    private final int replicationSessionId;
    private long replayPosition;
    private final long dstStopPosition;
    private final boolean isDestinationRecordingEmpty;
    private long timeOfLastActionMs;
    private final long actionTimeoutMs;
    private final long replicationId;
    private final long channelTagId;
    private final long subscriptionTagId;
    private final long srcRecordingId;
    private long dstRecordingId;
    private int replayStreamId;
    private int replaySessionId;
    private boolean isLiveAdded;
    private final boolean isTagged;
    private final String replicationChannel;
    private final String liveDestination;
    private String replayDestination;
    private final CachedEpochClock epochClock;
    private final ArchiveConductor conductor;
    private final ControlSession controlSession;
    private final ControlResponseProxy controlResponseProxy;
    private final Catalog catalog;
    private final int fileIoMaxLength;
    private final Aeron aeron;
    private final AeronArchive.Context context;
    private AeronArchive.AsyncConnect asyncConnect;
    private AeronArchive srcArchive;
    private Subscription recordingSubscription;
    private Image image;
    private long activeCorrelationId = -1;
    private long srcReplaySessionId = -1;
    private long srcStopPosition = -1;
    private long srcRecordingPosition = -1;
    private int retryAttempts = 3;
    private State state = State.CONNECT;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/aeron/archive/ReplicationSession$State.class */
    public enum State {
        CONNECT,
        REPLICATE_DESCRIPTOR,
        SRC_RECORDING_POSITION,
        EXTEND,
        REPLAY,
        AWAIT_IMAGE,
        REPLICATE,
        CATCHUP,
        ATTEMPT_LIVE_JOIN,
        DONE
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplicationSession(long j, long j2, long j3, long j4, long j5, long j6, String str, String str2, int i, int i2, RecordingSummary recordingSummary, AeronArchive.Context context, CachedEpochClock cachedEpochClock, Catalog catalog, ControlResponseProxy controlResponseProxy, ControlSession controlSession) {
        this.replayPosition = -1L;
        this.replicationId = j5;
        this.srcRecordingId = j;
        this.dstRecordingId = j2;
        this.liveDestination = Strings.isEmpty(str) ? null : str;
        this.replicationChannel = str2;
        this.fileIoMaxLength = i;
        this.replicationSessionId = i2;
        this.aeron = context.aeron();
        this.context = context;
        this.catalog = catalog;
        this.controlResponseProxy = controlResponseProxy;
        this.epochClock = cachedEpochClock;
        this.conductor = controlSession.archiveConductor();
        this.controlSession = controlSession;
        this.actionTimeoutMs = TimeUnit.NANOSECONDS.toMillis(context.messageTimeoutNs());
        this.dstStopPosition = j6;
        this.isTagged = (-1 == j3 && -1 == j4) ? false : true;
        this.channelTagId = -1 == j3 ? j5 : j3;
        this.subscriptionTagId = -1 == j4 ? j5 : j4;
        if (null == recordingSummary) {
            this.isDestinationRecordingEmpty = false;
            return;
        }
        this.replayPosition = recordingSummary.stopPosition;
        this.replayStreamId = recordingSummary.streamId;
        this.isDestinationRecordingEmpty = recordingSummary.startPosition == recordingSummary.stopPosition;
    }

    @Override // io.aeron.archive.Session
    public long sessionId() {
        return this.replicationId;
    }

    @Override // io.aeron.archive.Session
    public boolean isDone() {
        return this.state == State.DONE;
    }

    @Override // io.aeron.archive.Session
    public void abort() {
        state(State.DONE);
    }

    @Override // io.aeron.archive.Session
    public void close() {
        ArchiveConductor archiveConductor = this.controlSession.archiveConductor();
        CountedErrorHandler countedErrorHandler = archiveConductor.context().countedErrorHandler();
        stopRecording();
        stopReplaySession(countedErrorHandler);
        CloseHelper.close(countedErrorHandler, this.asyncConnect);
        CloseHelper.close(countedErrorHandler, this.srcArchive);
        archiveConductor.removeReplicationSession(this);
        signal(-1L, RecordingSignal.REPLICATE_END);
    }

    @Override // io.aeron.archive.Session
    public int doWork() {
        int i = 0;
        try {
            if (null != this.recordingSubscription && this.recordingSubscription.isClosed()) {
                state(State.DONE);
                return 1;
            }
            switch (this.state) {
                case CONNECT:
                    i = 0 + connect();
                    break;
                case REPLICATE_DESCRIPTOR:
                    i = 0 + replicateDescriptor();
                    break;
                case SRC_RECORDING_POSITION:
                    i = 0 + srcRecordingPosition();
                    break;
                case EXTEND:
                    i = 0 + extend();
                    break;
                case REPLAY:
                    i = 0 + replay();
                    break;
                case AWAIT_IMAGE:
                    i = 0 + awaitImage();
                    break;
                case REPLICATE:
                    i = 0 + replicate();
                    break;
                case CATCHUP:
                    i = 0 + catchup();
                    break;
                case ATTEMPT_LIVE_JOIN:
                    i = 0 + attemptLiveJoin();
                    break;
            }
            return i;
        } catch (Exception e) {
            state(State.DONE);
            error(e.getMessage(), 0);
            throw e;
        }
    }

    @Override // io.aeron.archive.client.RecordingDescriptorConsumer
    public void onRecordingDescriptor(long j, long j2, long j3, long j4, long j5, long j6, long j7, int i, int i2, int i3, int i4, int i5, int i6, String str, String str2, String str3) {
        this.srcStopPosition = j7;
        this.replayStreamId = i6;
        if (null != this.liveDestination || -1 == this.replicationSessionId) {
            this.replaySessionId = i5;
        } else {
            this.replaySessionId = this.replicationSessionId;
        }
        if (-1 != this.fileIoMaxLength && this.fileIoMaxLength < i4) {
            state(State.DONE);
            error("Replication fileIoMaxLength is less than than the recording mtuLength", 0);
            return;
        }
        if (-1 == this.dstRecordingId) {
            this.replayPosition = j6;
            this.dstRecordingId = this.catalog.addNewRecording(j6, j6, j4, j4, i, i2, i3, i4, i5, i6, str, str2, str3);
            signal(j6, RecordingSignal.REPLICATE);
        } else if (this.isDestinationRecordingEmpty) {
            this.replayPosition = j6;
            this.catalog.replaceRecording(this.dstRecordingId, j6, j6, j4, j4, i, i2, i3, i4, i5, i6, str, str2, str3);
        }
        State state = State.EXTEND;
        if (null != this.liveDestination) {
            if (-1 != j7) {
                state(State.DONE);
                error("cannot live merge without active source recording", 0);
                return;
            }
            state = State.SRC_RECORDING_POSITION;
        }
        if (j6 == j7 || (-1 != this.dstRecordingId && j7 == this.catalog.stopPosition(this.dstRecordingId))) {
            signal(j7, RecordingSignal.SYNC);
            state = State.DONE;
        }
        state(state);
    }

    private int connect() {
        int i = 0;
        if (null == this.asyncConnect) {
            this.asyncConnect = AeronArchive.asyncConnect(this.context);
            i = 0 + 1;
        } else {
            int step = this.asyncConnect.step();
            try {
                AeronArchive poll = this.asyncConnect.poll();
                if (null != poll) {
                    this.srcArchive = poll;
                    this.asyncConnect = null;
                    state(State.REPLICATE_DESCRIPTOR);
                    i = 0 + 1;
                } else if (this.asyncConnect.step() != step) {
                    i = 0 + 1;
                }
            } catch (AeronException e) {
                state(State.DONE);
                error("Replication connection failed=" + e.getMessage(), 14);
            }
        }
        return i;
    }

    private int replicateDescriptor() {
        int i = 0;
        if (-1 == this.activeCorrelationId) {
            long nextCorrelationId = this.aeron.nextCorrelationId();
            if (this.srcArchive.archiveProxy().listRecording(this.srcRecordingId, nextCorrelationId, this.srcArchive.controlSessionId())) {
                i = 0 + trackAction(nextCorrelationId);
                this.srcArchive.recordingDescriptorPoller().reset(nextCorrelationId, 1, this);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to list remote recording descriptor");
            }
        } else {
            RecordingDescriptorPoller recordingDescriptorPoller = this.srcArchive.recordingDescriptorPoller();
            int poll = recordingDescriptorPoller.poll();
            if (recordingDescriptorPoller.isDispatchComplete() && recordingDescriptorPoller.remainingRecordCount() > 0) {
                state(State.DONE);
                error("unknown src recording id " + this.srcRecordingId, 5);
            }
            if (0 == poll && this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to fetch remote recording descriptor");
            }
            i = 0 + poll;
        }
        return i;
    }

    private int srcRecordingPosition() {
        int i = 0;
        if (-1 == this.activeCorrelationId) {
            long nextCorrelationId = this.aeron.nextCorrelationId();
            if (this.srcArchive.archiveProxy().getRecordingPosition(this.srcRecordingId, nextCorrelationId, this.srcArchive.controlSessionId())) {
                i = 0 + trackAction(nextCorrelationId);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to send recording position request");
            }
        } else {
            ControlResponsePoller controlResponsePoller = this.srcArchive.controlResponsePoller();
            i = 0 + controlResponsePoller.poll();
            if (hasResponse(controlResponsePoller)) {
                this.srcRecordingPosition = controlResponsePoller.relevantId();
                if (-1 == this.srcRecordingPosition && null != this.liveDestination) {
                    throw new ArchiveException("cannot live merge without active source recording");
                }
                state(State.EXTEND);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to get recording position");
            }
        }
        return i;
    }

    private int extend() {
        boolean z = this.isTagged || null != this.liveDestination;
        ChannelUri parse = ChannelUri.parse(this.replicationChannel);
        String str = parse.get(CommonContext.ENDPOINT_PARAM_NAME);
        parse.put(CommonContext.SESSION_ID_PARAM_NAME, Integer.toString(this.replaySessionId));
        parse.put(CommonContext.REJOIN_PARAM_NAME, "false");
        if (z) {
            parse.remove(CommonContext.ENDPOINT_PARAM_NAME);
            parse.put(CommonContext.TAGS_PARAM_NAME, this.channelTagId + "," + this.subscriptionTagId);
            parse.put(CommonContext.MDC_CONTROL_MODE_PARAM_NAME, CommonContext.MDC_CONTROL_MODE_MANUAL);
        }
        this.recordingSubscription = this.conductor.extendRecording(this.replicationId, this.dstRecordingId, this.replayStreamId, SourceLocation.REMOTE, true, parse.toString(), this.controlSession);
        if (null == this.recordingSubscription) {
            state(State.DONE);
            return 1;
        }
        if (z) {
            this.replayDestination = ChannelUri.createDestinationUri(this.replicationChannel, str);
            this.recordingSubscription.asyncAddDestination(this.replayDestination);
        }
        state(State.REPLAY);
        return 1;
    }

    private int replay() {
        int i = 0;
        if (-1 == this.activeCorrelationId) {
            String resolvedEndpoint = this.recordingSubscription.resolvedEndpoint();
            if (null == resolvedEndpoint) {
                if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                    throw new TimeoutException("failed to resolve subscription endpoint: channel=" + this.recordingSubscription.channel());
                }
                return 0;
            }
            ChannelUri parse = ChannelUri.parse(this.replicationChannel);
            parse.put(CommonContext.SESSION_ID_PARAM_NAME, Integer.toString(this.replaySessionId));
            if (null != parse.get(CommonContext.ENDPOINT_PARAM_NAME)) {
                parse.replaceEndpointWildcardPort(resolvedEndpoint);
            }
            if (null != this.liveDestination) {
                parse.put(CommonContext.LINGER_PARAM_NAME, SessionConstants.HEARTBEAT_MESSAGE_TYPE_STR);
                parse.put(CommonContext.EOS_PARAM_NAME, "false");
            }
            long nextCorrelationId = this.aeron.nextCorrelationId();
            if (this.srcArchive.archiveProxy().replay(this.srcRecordingId, parse.toString(), this.replayStreamId, new ReplayParams().position(this.replayPosition).length(-1 == this.dstStopPosition ? -1L : this.dstStopPosition - this.replayPosition).fileIoMaxLength(this.fileIoMaxLength), nextCorrelationId, this.srcArchive.controlSessionId())) {
                i = 0 + trackAction(nextCorrelationId);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to send replay request");
            }
        } else {
            ControlResponsePoller controlResponsePoller = this.srcArchive.controlResponsePoller();
            i = 0 + controlResponsePoller.poll();
            if (hasResponse(controlResponsePoller)) {
                this.srcReplaySessionId = controlResponsePoller.relevantId();
                state(State.AWAIT_IMAGE);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed get acknowledgement of replay request to: " + this.replicationChannel);
            }
        }
        return i;
    }

    private int awaitImage() {
        int i = 0;
        Image imageBySessionId = this.recordingSubscription.imageBySessionId(this.replaySessionId);
        if (null != imageBySessionId) {
            this.image = imageBySessionId;
            state(null == this.liveDestination ? State.REPLICATE : State.CATCHUP);
            i = 0 + 1;
        } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
            throw new TimeoutException("failed get replay image for sessionId " + this.replaySessionId + " on channel " + this.recordingSubscription.channel());
        }
        return i;
    }

    private int replicate() {
        int i = 0;
        boolean isClosed = this.image.isClosed();
        boolean isEndOfStream = this.image.isEndOfStream();
        long position = this.image.position();
        boolean z = -1 != this.srcStopPosition && position >= this.srcStopPosition;
        if (z || ((-1 != this.dstStopPosition && position >= this.dstStopPosition) || isEndOfStream || isClosed)) {
            logReplicationSessionDone(this.controlSession.sessionId(), this.replicationId, this.srcRecordingId, this.replayPosition, this.srcStopPosition, this.dstRecordingId, this.dstStopPosition, position, isClosed, isEndOfStream, z);
            if (z) {
                signal(position, RecordingSignal.SYNC);
            }
            this.srcReplaySessionId = -1L;
            state(State.DONE);
            i = 0 + 1;
        }
        return i;
    }

    private int catchup() {
        int i = 0;
        if (this.image.position() >= this.srcRecordingPosition) {
            state(State.ATTEMPT_LIVE_JOIN);
            i = 0 + 1;
        } else if (this.image.isClosed()) {
            throw new ArchiveException("replication image closed unexpectedly");
        }
        return i;
    }

    private int attemptLiveJoin() {
        int i = 0;
        if (-1 == this.activeCorrelationId) {
            long nextCorrelationId = this.aeron.nextCorrelationId();
            if (this.srcArchive.archiveProxy().getRecordingPosition(this.srcRecordingId, nextCorrelationId, this.srcArchive.controlSessionId())) {
                i = 0 + trackAction(nextCorrelationId);
            } else if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                throw new TimeoutException("failed to send recording position request");
            }
        } else {
            ControlResponsePoller controlResponsePoller = this.srcArchive.controlResponsePoller();
            i = 0 + controlResponsePoller.poll();
            if (hasResponse(controlResponsePoller)) {
                trackAction(-1L);
                this.retryAttempts = 3;
                this.srcRecordingPosition = controlResponsePoller.relevantId();
                if (-1 == this.srcRecordingPosition && null != this.liveDestination) {
                    throw new ArchiveException("cannot live merge without active source recording");
                }
                long position = this.image.position();
                if (shouldAddLiveDestination(position)) {
                    this.recordingSubscription.asyncAddDestination(this.liveDestination);
                    this.isLiveAdded = true;
                } else if (shouldStopReplay(position)) {
                    this.recordingSubscription.asyncRemoveDestination(this.replayDestination);
                    this.replayDestination = null;
                    this.recordingSubscription = null;
                    signal(position, RecordingSignal.MERGE);
                    state(State.DONE);
                }
                i++;
            } else {
                if (this.image.isClosed()) {
                    throw new ArchiveException("replication image closed unexpectedly");
                }
                if (this.epochClock.time() >= this.timeOfLastActionMs + this.actionTimeoutMs) {
                    int i2 = this.retryAttempts - 1;
                    this.retryAttempts = i2;
                    if (i2 == 0) {
                        throw new TimeoutException("failed to get recording position");
                    }
                    trackAction(-1L);
                }
            }
        }
        return i;
    }

    private boolean hasResponse(ControlResponsePoller controlResponsePoller) {
        if (!controlResponsePoller.isPollComplete() || controlResponsePoller.controlSessionId() != this.srcArchive.controlSessionId()) {
            return false;
        }
        ControlResponseCode code = controlResponsePoller.code();
        if (ControlResponseCode.ERROR == code) {
            throw new ArchiveException(controlResponsePoller.errorMessage(), (int) controlResponsePoller.relevantId());
        }
        return controlResponsePoller.correlationId() == this.activeCorrelationId && ControlResponseCode.OK == code;
    }

    private void error(String str, int i) {
        if (this.controlSession.controlPublication().isConnected()) {
            this.controlSession.sendErrorResponse(this.replicationId, i, str, this.controlResponseProxy);
        }
    }

    private void signal(long j, RecordingSignal recordingSignal) {
        this.controlSession.sendSignal(this.replicationId, this.dstRecordingId, null != this.recordingSubscription ? this.recordingSubscription.registrationId() : -1L, j, recordingSignal);
    }

    private void stopReplaySession(CountedErrorHandler countedErrorHandler) {
        if (-1 != this.srcReplaySessionId) {
            try {
                this.srcArchive.archiveProxy().stopReplay(this.srcReplaySessionId, this.aeron.nextCorrelationId(), this.srcArchive.controlSessionId());
            } catch (Exception e) {
                countedErrorHandler.onError(e);
            }
            this.srcReplaySessionId = -1L;
        }
    }

    private void stopRecording() {
        if (null != this.recordingSubscription) {
            this.conductor.stopRecordingSubscription(this.recordingSubscription.registrationId());
            this.recordingSubscription = null;
        }
    }

    private boolean shouldAddLiveDestination(long j) {
        return !this.isLiveAdded && this.srcRecordingPosition - j <= ((long) Math.min(this.image.termBufferLength() >> 2, ReplayMerge.LIVE_ADD_MAX_WINDOW));
    }

    private boolean shouldStopReplay(long j) {
        return this.isLiveAdded && this.srcRecordingPosition - j <= 0 && this.image.activeTransportCount() >= 2;
    }

    private int trackAction(long j) {
        this.timeOfLastActionMs = this.epochClock.time();
        this.activeCorrelationId = j;
        return 1;
    }

    private void state(State state) {
        logStateChange(this.state, state, this.replicationId, null != this.image ? this.image.position() : -1L);
        this.state = state;
        this.activeCorrelationId = -1L;
        this.timeOfLastActionMs = this.epochClock.time();
    }

    private void logStateChange(State state, State state2, long j, long j2) {
    }

    private void logReplicationSessionDone(long j, long j2, long j3, long j4, long j5, long j6, long j7, long j8, boolean z, boolean z2, boolean z3) {
    }
}
