/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import org.apache.flink.elasticsearch6.shaded.org.apache.lucene.store.AlreadyClosedException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.Assertions;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.bytes.BytesArray;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.bytes.BytesReference;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.collect.Tuple;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.unit.ByteSizeValue;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.core.internal.io.IOUtils;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.seqno.SequenceNumbers;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.shard.ShardId;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.BaseTranslogReader;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.BufferedChecksumStreamInput;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.ChannelFactory;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.Checkpoint;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.TragicExceptionHolder;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.Translog;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.TranslogException;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.TranslogHeader;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.TranslogReader;
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.index.translog.TranslogSnapshot;

public class TranslogWriter
extends BaseTranslogReader
implements Closeable {
    private final ShardId shardId;
    private final ChannelFactory channelFactory;
    private volatile Checkpoint lastSyncedCheckpoint;
    private volatile int operationCounter;
    private final TragicExceptionHolder tragedy;
    private final OutputStream outputStream;
    private volatile long totalOffset;
    private volatile long minSeqNo;
    private volatile long maxSeqNo;
    private final LongSupplier globalCheckpointSupplier;
    private final LongSupplier minTranslogGenerationSupplier;
    protected final AtomicBoolean closed = new AtomicBoolean(false);
    private final Object syncLock = new Object();
    private final Map<Long, Tuple<BytesReference, Exception>> seenSequenceNumbers;

    private TranslogWriter(ChannelFactory channelFactory, ShardId shardId, Checkpoint initialCheckpoint, FileChannel channel, Path path, ByteSizeValue bufferSize, LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, TranslogHeader header, TragicExceptionHolder tragedy) throws IOException {
        super(initialCheckpoint.generation, channel, path, header);
        assert (initialCheckpoint.offset == channel.position()) : "initial checkpoint offset [" + initialCheckpoint.offset + "] is different than current channel position [" + channel.position() + "]";
        this.shardId = shardId;
        this.channelFactory = channelFactory;
        this.minTranslogGenerationSupplier = minTranslogGenerationSupplier;
        this.outputStream = new BufferedChannelOutputStream(Channels.newOutputStream(channel), bufferSize.bytesAsInt());
        this.lastSyncedCheckpoint = initialCheckpoint;
        this.totalOffset = initialCheckpoint.offset;
        assert (initialCheckpoint.minSeqNo == -1L) : initialCheckpoint.minSeqNo;
        this.minSeqNo = initialCheckpoint.minSeqNo;
        assert (initialCheckpoint.maxSeqNo == -1L) : initialCheckpoint.maxSeqNo;
        this.maxSeqNo = initialCheckpoint.maxSeqNo;
        assert (initialCheckpoint.trimmedAboveSeqNo == -2L) : initialCheckpoint.trimmedAboveSeqNo;
        this.globalCheckpointSupplier = globalCheckpointSupplier;
        this.seenSequenceNumbers = Assertions.ENABLED ? new HashMap() : null;
        this.tragedy = tragedy;
    }

    public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, ChannelFactory channelFactory, ByteSizeValue bufferSize, long initialMinTranslogGen, long initialGlobalCheckpoint, LongSupplier globalCheckpointSupplier, LongSupplier minTranslogGenerationSupplier, long primaryTerm, TragicExceptionHolder tragedy) throws IOException {
        FileChannel channel = channelFactory.open(file);
        try {
            TranslogHeader header = new TranslogHeader(translogUUID, primaryTerm);
            header.write(channel);
            Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(header.sizeInBytes(), fileGeneration, initialGlobalCheckpoint, initialMinTranslogGen);
            TranslogWriter.writeCheckpoint(channelFactory, file.getParent(), checkpoint);
            LongSupplier writerGlobalCheckpointSupplier = Assertions.ENABLED ? () -> {
                long gcp = globalCheckpointSupplier.getAsLong();
                assert (gcp >= initialGlobalCheckpoint) : "global checkpoint [" + gcp + "] lower than initial gcp [" + initialGlobalCheckpoint + "]";
                return gcp;
            } : globalCheckpointSupplier;
            return new TranslogWriter(channelFactory, shardId, checkpoint, channel, file, bufferSize, writerGlobalCheckpointSupplier, minTranslogGenerationSupplier, header, tragedy);
        }
        catch (Exception exception) {
            IOUtils.closeWhileHandlingException(channel);
            throw exception;
        }
    }

    private synchronized void closeWithTragicEvent(Exception ex) {
        this.tragedy.setTragicException(ex);
        try {
            this.close();
        }
        catch (IOException | RuntimeException e) {
            ex.addSuppressed(e);
        }
    }

    public synchronized Translog.Location add(BytesReference data, long seqNo) throws IOException {
        this.ensureOpen();
        long offset = this.totalOffset;
        try {
            data.writeTo(this.outputStream);
        }
        catch (Exception ex) {
            this.closeWithTragicEvent(ex);
            throw ex;
        }
        this.totalOffset += (long)data.length();
        if (this.minSeqNo == -1L) assert (this.operationCounter == 0);
        if (this.maxSeqNo == -1L) assert (this.operationCounter == 0);
        this.minSeqNo = SequenceNumbers.min(this.minSeqNo, seqNo);
        this.maxSeqNo = SequenceNumbers.max(this.maxSeqNo, seqNo);
        ++this.operationCounter;
        assert (this.assertNoSeqNumberConflict(seqNo, data));
        return new Translog.Location(this.generation, offset, data.length());
    }

    private synchronized boolean assertNoSeqNumberConflict(long seqNo, BytesReference data) throws IOException {
        if (seqNo != -2L) {
            if (this.seenSequenceNumbers.containsKey(seqNo)) {
                Tuple<BytesReference, Exception> previous = this.seenSequenceNumbers.get(seqNo);
                if (!previous.v1().equals(data)) {
                    boolean sameOp;
                    Translog.Operation newOp = Translog.readOperation(new BufferedChecksumStreamInput(data.streamInput(), "assertion"));
                    Translog.Operation prvOp = Translog.readOperation(new BufferedChecksumStreamInput(previous.v1().streamInput(), "assertion"));
                    if (prvOp instanceof Translog.Index && newOp instanceof Translog.Index) {
                        Translog.Index o1 = (Translog.Index)prvOp;
                        Translog.Index o2 = (Translog.Index)newOp;
                        sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) && Objects.equals(o1.source(), o2.source()) && Objects.equals(o1.routing(), o2.routing()) && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version();
                    } else if (prvOp instanceof Translog.Delete && newOp instanceof Translog.Delete) {
                        Translog.Delete o1 = (Translog.Delete)prvOp;
                        Translog.Delete o2 = (Translog.Delete)newOp;
                        sameOp = Objects.equals(o1.id(), o2.id()) && Objects.equals(o1.type(), o2.type()) && o1.primaryTerm() == o2.primaryTerm() && o1.seqNo() == o2.seqNo() && o1.version() == o2.version();
                    } else {
                        sameOp = false;
                    }
                    if (!sameOp) {
                        throw new AssertionError("seqNo [" + seqNo + "] was processed twice in generation [" + this.generation + "], with different data. prvOp [" + prvOp + "], newOp [" + newOp + "]", previous.v2());
                    }
                }
            } else {
                this.seenSequenceNumbers.put(seqNo, new Tuple<BytesArray, RuntimeException>(new BytesArray(data.toBytesRef(), true), new RuntimeException("stack capture previous op")));
            }
        }
        return true;
    }

    synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) {
        this.seenSequenceNumbers.entrySet().stream().filter(e -> (Long)e.getKey() > aboveSeqNo).forEach(e -> {
            Translog.Operation op;
            try {
                op = Translog.readOperation(new BufferedChecksumStreamInput(((BytesReference)((Tuple)e.getValue()).v1()).streamInput(), "assertion"));
            }
            catch (IOException ex) {
                throw new RuntimeException(ex);
            }
            long seqNo = op.seqNo();
            long primaryTerm = op.primaryTerm();
            if (primaryTerm < belowTerm) {
                throw new AssertionError((Object)("current should not have any operations with seq#:primaryTerm [" + seqNo + ":" + primaryTerm + "] > " + aboveSeqNo + ":" + belowTerm));
            }
        });
        return true;
    }

    public void sync() throws IOException {
        this.syncUpTo(Long.MAX_VALUE);
    }

    public boolean syncNeeded() {
        return this.totalOffset != this.lastSyncedCheckpoint.offset || this.globalCheckpointSupplier.getAsLong() != this.lastSyncedCheckpoint.globalCheckpoint || this.minTranslogGenerationSupplier.getAsLong() != this.lastSyncedCheckpoint.minTranslogGeneration;
    }

    @Override
    public int totalOperations() {
        return this.operationCounter;
    }

    @Override
    synchronized Checkpoint getCheckpoint() {
        return new Checkpoint(this.totalOffset, this.operationCounter, this.generation, this.minSeqNo, this.maxSeqNo, this.globalCheckpointSupplier.getAsLong(), this.minTranslogGenerationSupplier.getAsLong(), -2L);
    }

    @Override
    public long sizeInBytes() {
        return this.totalOffset;
    }

    public TranslogReader closeIntoReader() throws IOException {
        Object object = this.syncLock;
        synchronized (object) {
            TranslogWriter translogWriter = this;
            synchronized (translogWriter) {
                try {
                    this.sync();
                }
                catch (Exception ex) {
                    this.closeWithTragicEvent(ex);
                    throw ex;
                }
                if (this.closed.compareAndSet(false, true)) {
                    return new TranslogReader(this.getLastSyncedCheckpoint(), this.channel, this.path, this.header);
                }
                throw new AlreadyClosedException("translog [" + this.getGeneration() + "] is already closed (path [" + this.path + "]", this.tragedy.get());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TranslogSnapshot newSnapshot() {
        Object object = this.syncLock;
        synchronized (object) {
            TranslogWriter translogWriter = this;
            synchronized (translogWriter) {
                this.ensureOpen();
                try {
                    this.sync();
                }
                catch (IOException e) {
                    throw new TranslogException(this.shardId, "exception while syncing before creating a snapshot", e);
                }
                return super.newSnapshot();
            }
        }
    }

    private long getWrittenOffset() throws IOException {
        return this.channel.position();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean syncUpTo(long offset) throws IOException {
        if (this.lastSyncedCheckpoint.offset < offset && this.syncNeeded()) {
            Object object = this.syncLock;
            synchronized (object) {
                if (this.lastSyncedCheckpoint.offset < offset && this.syncNeeded()) {
                    Checkpoint checkpointToSync;
                    TranslogWriter translogWriter = this;
                    synchronized (translogWriter) {
                        this.ensureOpen();
                        try {
                            this.outputStream.flush();
                            checkpointToSync = this.getCheckpoint();
                        }
                        catch (Exception ex) {
                            this.closeWithTragicEvent(ex);
                            throw ex;
                        }
                    }
                    try {
                        this.channel.force(false);
                        TranslogWriter.writeCheckpoint(this.channelFactory, this.path.getParent(), checkpointToSync);
                    }
                    catch (Exception ex) {
                        this.closeWithTragicEvent(ex);
                        throw ex;
                    }
                    assert (this.lastSyncedCheckpoint.offset <= checkpointToSync.offset) : "illegal state: " + this.lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
                    this.lastSyncedCheckpoint = checkpointToSync;
                    return true;
                }
            }
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
        block6: {
            try {
                if (position + (long)targetBuffer.remaining() <= this.getWrittenOffset()) break block6;
                TranslogWriter translogWriter = this;
                synchronized (translogWriter) {
                    if (position + (long)targetBuffer.remaining() > this.getWrittenOffset()) {
                        this.outputStream.flush();
                    }
                }
            }
            catch (Exception ex) {
                this.closeWithTragicEvent(ex);
                throw ex;
            }
        }
        org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.io.Channels.readFromFileChannelWithEofException(this.channel, position, targetBuffer);
    }

    private static void writeCheckpoint(ChannelFactory channelFactory, Path translogFile, Checkpoint checkpoint) throws IOException {
        Checkpoint.write(channelFactory, translogFile.resolve("translog.ckp"), checkpoint, StandardOpenOption.WRITE);
    }

    Checkpoint getLastSyncedCheckpoint() {
        return this.lastSyncedCheckpoint;
    }

    protected final void ensureOpen() {
        if (this.isClosed()) {
            throw new AlreadyClosedException("translog [" + this.getGeneration() + "] is already closed", this.tragedy.get());
        }
    }

    @Override
    public final void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            this.channel.close();
        }
    }

    protected final boolean isClosed() {
        return this.closed.get();
    }

    private final class BufferedChannelOutputStream
    extends BufferedOutputStream {
        BufferedChannelOutputStream(OutputStream out, int size) throws IOException {
            super(out, size);
        }

        @Override
        public synchronized void flush() throws IOException {
            if (this.count > 0) {
                try {
                    TranslogWriter.this.ensureOpen();
                    super.flush();
                }
                catch (Exception ex) {
                    TranslogWriter.this.closeWithTragicEvent(ex);
                    throw ex;
                }
            }
        }

        @Override
        public void close() throws IOException {
            throw new IllegalStateException("never close this stream");
        }
    }
}

