/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ExclusivePublication;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.client.RecordingDescriptorConsumer;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.driver.Configuration;
import java.io.File;
import java.nio.MappedByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.LongConsumer;
import org.agrona.ErrorHandler;
import org.agrona.IoUtil;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongHashSet;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.CountersReader;
import uk.co.real_logic.artio.CommonConfiguration;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.logger.RecordingIdLookup;
import uk.co.real_logic.artio.storage.messages.MessageHeaderDecoder;
import uk.co.real_logic.artio.storage.messages.MessageHeaderEncoder;
import uk.co.real_logic.artio.storage.messages.PreviousRecordingDecoder;
import uk.co.real_logic.artio.storage.messages.PreviousRecordingEncoder;
import uk.co.real_logic.artio.util.CharFormatter;

public class RecordingCoordinator
implements AutoCloseable,
RecordingDescriptorConsumer {
    private static volatile boolean saveOnShutdown = true;
    private static final String FILE_NAME = "recording_coordinator";
    private final CharFormatter loadRecordings = new CharFormatter("RecordingCoordinator.loadRecordingIds: inbound=%s,outbound=%s");
    private final CharFormatter recordingStarted = new CharFormatter("RecordingCoordinator.recordingStarted: recordingId=%s,direction=%s");
    private final IdleStrategy idleStrategy = CommonConfiguration.backoffIdleStrategy();
    private final SourceLocation outboundLocation;
    private final LongHashSet trackedRegistrationIds = new LongHashSet();
    private final Aeron aeron;
    private final AeronArchive archive;
    private final String channel;
    private final CountersReader counters;
    private final EngineConfiguration configuration;
    private final RecordingIdLookup framerInboundLookup;
    private final RecordingIdLookup framerOutboundLookup;
    private final RecordingIdLookup indexerInboundLookup;
    private final RecordingIdLookup indexerOutboundLookup;
    private final File recordingIdsFile;
    private final ErrorHandler errorHandler;
    private final RecordingIds inboundRecordingIds = new RecordingIds();
    private final RecordingIds outboundRecordingIds = new RecordingIds();
    private final Long2ObjectHashMap<LibraryExtendPosition> libraryIdToExtendPosition = new Long2ObjectHashMap();
    private Long2LongHashMap inboundAeronSessionIdToCompletionPosition;
    private Long2LongHashMap outboundAeronSessionIdToCompletionPosition;
    private boolean closed = false;
    private LibraryExtendPosition libraryExtendPosition;

    public static void saveOnShutdownTesting(boolean saveOnShutdown) {
        RecordingCoordinator.saveOnShutdown = saveOnShutdown;
    }

    public static File recordingIdsFile(EngineConfiguration configuration) {
        return new File(configuration.logFileDir(), FILE_NAME);
    }

    RecordingCoordinator(Aeron aeron, AeronArchive archive, EngineConfiguration configuration, IdleStrategy archiverIdleStrategy, ErrorHandler errorHandler) {
        this.aeron = aeron;
        this.archive = archive;
        this.configuration = configuration;
        this.channel = configuration.libraryAeronChannel();
        this.recordingIdsFile = RecordingCoordinator.recordingIdsFile(configuration);
        this.errorHandler = errorHandler;
        this.outboundLocation = this.channel.equals("aeron:ipc") ? SourceLocation.LOCAL : SourceLocation.REMOTE;
        this.loadRecordingIdsFile();
        if (configuration.logAnyMessages()) {
            this.counters = this.aeron.countersReader();
            this.framerInboundLookup = new RecordingIdLookup(archiverIdleStrategy, this.counters);
            this.framerOutboundLookup = new RecordingIdLookup(archiverIdleStrategy, this.counters);
            this.indexerInboundLookup = new RecordingIdLookup(archiverIdleStrategy, this.counters);
            this.indexerOutboundLookup = new RecordingIdLookup(archiverIdleStrategy, this.counters);
        } else {
            this.counters = null;
            this.framerInboundLookup = null;
            this.framerOutboundLookup = null;
            this.indexerInboundLookup = null;
            this.indexerOutboundLookup = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadRecordingIdsFile() {
        if (this.recordingIdsFile.exists()) {
            MappedByteBuffer mappedBuffer = IoUtil.mapExistingFile(this.recordingIdsFile, FILE_NAME);
            UnsafeBuffer buffer = new UnsafeBuffer(mappedBuffer);
            try {
                MessageHeaderDecoder header = new MessageHeaderDecoder();
                PreviousRecordingDecoder previousRecording = new PreviousRecordingDecoder();
                header.wrap(buffer, 0);
                previousRecording.wrap(buffer, 8, header.blockLength(), header.version());
                for (PreviousRecordingDecoder.InboundRecordingsDecoder inboundRecording : previousRecording.inboundRecordings()) {
                    this.inboundRecordingIds.free.add(inboundRecording.recordingId());
                }
                for (PreviousRecordingDecoder.OutboundRecordingsDecoder outboundRecording : previousRecording.outboundRecordings()) {
                    this.outboundRecordingIds.free.add(outboundRecording.recordingId());
                }
                if (DebugLogger.isEnabled(LogTag.STATE_CLEANUP)) {
                    DebugLogger.log(LogTag.STATE_CLEANUP, this.loadRecordings.clear().with(this.inboundRecordingIds.toString()).with(this.outboundRecordingIds.toString()));
                }
            }
            finally {
                IoUtil.unmap(mappedBuffer);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void saveRecordingIdsFile() {
        try {
            int inboundSize = this.inboundRecordingIds.size();
            int outboundSize = this.outboundRecordingIds.size() + this.libraryIdToExtendPosition.size();
            File saveFile = File.createTempFile(FILE_NAME, "tmp", new File(this.configuration.logFileDir()));
            int requiredLength = 14 + PreviousRecordingEncoder.InboundRecordingsEncoder.recordingIdEncodingLength() * inboundSize + PreviousRecordingEncoder.OutboundRecordingsEncoder.recordingIdEncodingLength() * outboundSize;
            MappedByteBuffer mappedBuffer = IoUtil.mapExistingFile(saveFile, FILE_NAME, 0L, requiredLength);
            UnsafeBuffer buffer = new UnsafeBuffer(mappedBuffer);
            try {
                MessageHeaderEncoder header = new MessageHeaderEncoder();
                PreviousRecordingEncoder previousRecording = new PreviousRecordingEncoder();
                previousRecording.wrapAndApplyHeader(buffer, 0, header);
                PreviousRecordingEncoder.InboundRecordingsEncoder inbound = previousRecording.inboundRecordingsCount(inboundSize);
                this.inboundRecordingIds.forEach(id -> inbound.next().recordingId(id));
                PreviousRecordingEncoder.OutboundRecordingsEncoder outbound = previousRecording.outboundRecordingsCount(outboundSize);
                this.outboundRecordingIds.forEach(id -> outbound.next().recordingId(id));
                for (LibraryExtendPosition pos : this.libraryIdToExtendPosition.values()) {
                    outbound.next().recordingId(pos.recordingId);
                }
                mappedBuffer.force();
            }
            finally {
                IoUtil.unmap(mappedBuffer);
            }
            Files.move(saveFile.toPath(), this.recordingIdsFile.toPath(), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
        }
        catch (Throwable e) {
            this.errorHandler.onError(e);
        }
    }

    public ExclusivePublication track(String aeronChannel, int streamId) {
        if (this.configuration.isRelevantStreamId(streamId)) {
            ExclusivePublication publication;
            boolean isInbound = streamId == this.configuration.inboundLibraryStream();
            RecordingIds recordingIds = isInbound ? this.inboundRecordingIds : this.outboundRecordingIds;
            RecordingIdLookup lookup = isInbound ? this.framerOutboundLookup : this.framerInboundLookup;
            LibraryExtendPosition libraryExtendPosition = this.acquireRecording(streamId, recordingIds);
            if (libraryExtendPosition != null) {
                ChannelUri channelUri = ChannelUri.parse(aeronChannel);
                channelUri.initialPosition(libraryExtendPosition.stopPosition, libraryExtendPosition.initialTermId, libraryExtendPosition.termBufferLength);
                RecordingCoordinator.setMtuLength(libraryExtendPosition.mtuLength, channelUri);
                String channel = channelUri.toString();
                publication = this.aeron.addExclusivePublication(channel, streamId);
                this.extendRecording(streamId, libraryExtendPosition, publication.sessionId());
            } else {
                publication = this.aeron.addExclusivePublication(aeronChannel, streamId);
                this.startRecording(streamId, publication.sessionId(), SourceLocation.LOCAL);
            }
            this.checkRecordingStart(publication.sessionId(), lookup, recordingIds.used, isInbound);
            return publication;
        }
        return this.aeron.addExclusivePublication(aeronChannel, streamId);
    }

    public static void setMtuLength(int mtuLength, ChannelUri channelUri) {
        channelUri.put("mtu", Integer.toString(mtuLength));
    }

    private void extendRecording(int streamId, LibraryExtendPosition libraryExtendPosition, int sessionId) {
        try {
            String recordingChannel = ChannelUri.addSessionId(this.channel, sessionId);
            long registrationId = this.archive.extendRecording(libraryExtendPosition.recordingId, recordingChannel, streamId, SourceLocation.LOCAL);
            this.trackedRegistrationIds.add(registrationId);
        }
        catch (ArchiveException e) {
            this.errorHandler.onError(e);
        }
    }

    private LibraryExtendPosition acquireRecording(int streamId, RecordingIds recordingIds) {
        this.libraryExtendPosition = null;
        LongHashSet.LongIterator it = recordingIds.free.iterator();
        if (it.hasNext()) {
            long recordingId = it.nextValue();
            it.remove();
            int count = this.archive.listRecording(recordingId, this);
            if (count != 1 || null == this.libraryExtendPosition) {
                this.errorHandler.onError(new IllegalStateException("Unable to reuse recordingId: " + recordingId + " (Perhaps you have deleted this recording id or some aeron archiver state?)"));
                if (this.libraryExtendPosition == null) {
                    return null;
                }
            } else if (this.libraryExtendPosition.streamId != streamId) {
                this.errorHandler.onError(new IllegalStateException(String.format("Unable to reuse recordingId: %d. Stream id is mismatch: actual: %d, expected: %d", recordingId, this.libraryExtendPosition.streamId, streamId)));
                this.libraryExtendPosition = null;
                return null;
            }
            while (this.libraryExtendPosition.stopPosition == -1L) {
                this.archive.tryStopRecordingByIdentity(recordingId);
                this.libraryExtendPosition.stopPosition = this.archive.getStopPosition(recordingId);
            }
        }
        return this.libraryExtendPosition;
    }

    public LibraryExtendPosition trackLibrary(int sessionId, int libraryId) {
        if (this.configuration.logOutboundMessages()) {
            int streamId = this.configuration.outboundLibraryStream();
            LibraryExtendPosition extendPosition = this.libraryIdToExtendPosition.get(libraryId);
            if (extendPosition != null) {
                if (sessionId != extendPosition.newSessionId) {
                    return extendPosition;
                }
                this.libraryIdToExtendPosition.remove(libraryId);
                this.checkRecordingStart(sessionId, this.framerOutboundLookup, this.outboundRecordingIds.used, false);
                return null;
            }
            extendPosition = this.acquireRecording(streamId, this.outboundRecordingIds);
            if (extendPosition != null) {
                this.extendRecording(streamId, extendPosition, extendPosition.newSessionId);
                this.saveRecordingIdsFile();
                this.libraryIdToExtendPosition.put(libraryId, extendPosition);
                return extendPosition;
            }
            if (this.startRecording(streamId, sessionId, this.outboundLocation)) {
                this.checkRecordingStart(sessionId, this.framerOutboundLookup, this.outboundRecordingIds.used, false);
            }
        }
        return null;
    }

    @Override
    public void onRecordingDescriptor(long controlSessionId, long correlationId, long recordingId, long startTimestamp, long stopTimestamp, long startPosition, long stopPosition, int initialTermId, int segmentFileLength, int termBufferLength, int mtuLength, int sessionId, int streamId, String strippedChannel, String originalChannel, String sourceIdentity) {
        int newSessionId = ThreadLocalRandom.current().nextInt(Configuration.publicationReservedSessionIdLow(), Configuration.publicationReservedSessionIdHigh());
        this.libraryExtendPosition = new LibraryExtendPosition(newSessionId, recordingId, streamId, stopPosition, initialTermId, termBufferLength, mtuLength);
    }

    private boolean startRecording(int streamId, int sessionId, SourceLocation local) {
        if (this.recordingAlreadyStarted(sessionId)) {
            return true;
        }
        try {
            String channel = ChannelUri.addSessionId(this.channel, sessionId);
            long registrationId = this.archive.startRecording(channel, streamId, local);
            this.trackedRegistrationIds.add(registrationId);
            return true;
        }
        catch (ArchiveException e) {
            this.errorHandler.onError(e);
            return false;
        }
    }

    private boolean recordingAlreadyStarted(int sessionId) {
        return RecordingPos.findCounterIdBySession(this.counters, sessionId) != -1;
    }

    private void checkRecordingStart(int sessionId, RecordingIdLookup lookup, LongHashSet recordingIds, boolean isInbound) {
        long recordingId = lookup.getRecordingId(sessionId);
        recordingIds.add(recordingId);
        this.saveRecordingIdsFile();
        if (DebugLogger.isEnabled(LogTag.STATE_CLEANUP)) {
            DebugLogger.log(LogTag.STATE_CLEANUP, this.recordingStarted.clear().with(recordingId).with(isInbound ? "inbound" : "outbound"));
        }
    }

    public void completionPositions(Long2LongHashMap inboundAeronSessionIdToCompletionPosition, Long2LongHashMap outboundAeronSessionIdToCompletionPosition) {
        this.inboundAeronSessionIdToCompletionPosition = inboundAeronSessionIdToCompletionPosition;
        this.outboundAeronSessionIdToCompletionPosition = outboundAeronSessionIdToCompletionPosition;
    }

    @Override
    public void close() {
        if (!this.closed) {
            this.awaitRecordingsCompletion();
            this.shutdownArchiver();
            this.closed = true;
        }
    }

    private void awaitRecordingsCompletion() {
        if (this.configuration.logInboundMessages()) {
            this.awaitRecordingsCompletion(this.inboundAeronSessionIdToCompletionPosition);
        }
        if (this.configuration.logOutboundMessages()) {
            this.awaitRecordingsCompletion(this.outboundAeronSessionIdToCompletionPosition);
        }
        if (saveOnShutdown) {
            this.saveRecordingIdsFile();
        }
    }

    private void awaitRecordingsCompletion(Long2LongHashMap aeronSessionIdToCompletionPosition) {
        if (aeronSessionIdToCompletionPosition == null) {
            throw new IllegalStateException("Unknown completionPositions when shutting down the RecordingCoordinator");
        }
        ArrayList<CompletingRecording> completingRecordings = new ArrayList<CompletingRecording>();
        aeronSessionIdToCompletionPosition.longForEach((sessionId, completionPosition) -> {
            int counterId = RecordingPos.findCounterIdBySession(this.counters, (int)sessionId);
            if (counterId != -1) {
                CompletingRecording recording = new CompletingRecording(completionPosition, counterId);
                completingRecordings.add(recording);
            }
        });
        while (!completingRecordings.isEmpty()) {
            completingRecordings.removeIf(CompletingRecording::hasRecordingCompleted);
            this.idleStrategy.idle();
        }
        this.idleStrategy.reset();
    }

    private void shutdownArchiver() {
        LongHashSet.LongIterator it = this.trackedRegistrationIds.iterator();
        while (it.hasNext()) {
            long registrationId = it.nextValue();
            this.archive.stopRecording(registrationId);
        }
        if (this.configuration.logAnyMessages()) {
            this.archive.close();
        }
    }

    public void forEachRecording(LongConsumer recordingIdConsumer) {
        this.inboundRecordingIds.forEach(recordingIdConsumer);
        this.outboundRecordingIds.forEach(recordingIdConsumer);
    }

    RecordingIdLookup indexerInboundRecordingIdLookup() {
        return this.indexerInboundLookup;
    }

    RecordingIdLookup indexerOutboundRecordingIdLookup() {
        return this.indexerOutboundLookup;
    }

    public RecordingIdLookup framerInboundLookup() {
        return this.framerInboundLookup;
    }

    public RecordingIdLookup framerOutboundLookup() {
        return this.framerOutboundLookup;
    }

    public static class LibraryExtendPosition {
        public final int newSessionId;
        public final long recordingId;
        public final int streamId;
        public final int initialTermId;
        public final int termBufferLength;
        public final int mtuLength;
        public long stopPosition;

        LibraryExtendPosition(int newSessionId, long recordingId, int streamId, long stopPosition, int initialTermId, int termBufferLength, int mtuLength) {
            this.newSessionId = newSessionId;
            this.recordingId = recordingId;
            this.streamId = streamId;
            this.stopPosition = stopPosition;
            this.initialTermId = initialTermId;
            this.termBufferLength = termBufferLength;
            this.mtuLength = mtuLength;
        }

        public String toString() {
            return "LibraryExtendPosition{newSessionId=" + this.newSessionId + ", recordingId=" + this.recordingId + ", streamId=" + this.streamId + ", stopPosition=" + this.stopPosition + ", initialTermId=" + this.initialTermId + ", termBufferLength=" + this.termBufferLength + ", mtuLength=" + this.mtuLength + '}';
        }
    }

    static final class RecordingIds {
        private final LongHashSet free = new LongHashSet();
        private final LongHashSet used = new LongHashSet();

        RecordingIds() {
        }

        int size() {
            return this.free.size() + this.used.size();
        }

        void forEach(LongConsumer recordingIdConsumer) {
            this.forEach(this.free, recordingIdConsumer);
            this.forEach(this.used, recordingIdConsumer);
        }

        private void forEach(LongHashSet set, LongConsumer recordingIdConsumer) {
            LongHashSet.LongIterator it = set.iterator();
            while (it.hasNext()) {
                recordingIdConsumer.accept(it.nextValue());
            }
        }

        public String toString() {
            return "RecordingIds{free=" + this.free + ", used=" + this.used + '}';
        }
    }

    private class CompletingRecording {
        private final long completedPosition;
        private final long recordingId;
        private final int counterId;

        CompletingRecording(long completedPosition, int counterId) {
            this.completedPosition = completedPosition;
            this.counterId = counterId;
            this.recordingId = RecordingPos.getRecordingId(RecordingCoordinator.this.counters, this.counterId);
        }

        boolean hasRecordingCompleted() {
            long recordedPosition = RecordingCoordinator.this.counters.getCounterValue(this.counterId);
            if (recordedPosition >= this.completedPosition) {
                return true;
            }
            if (!RecordingPos.isActive(RecordingCoordinator.this.counters, this.counterId, this.recordingId)) {
                throw new IllegalStateException("recording has stopped unexpectedly: " + this.recordingId);
            }
            return false;
        }
    }
}

