package org.apache.jackrabbit.oak.plugins.segment.standby.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.EventExecutorGroup;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.Segment;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeBuilder;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNodeState;
import org.apache.jackrabbit.oak.plugins.segment.SegmentNotFoundException;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.Messages;
import org.apache.jackrabbit.oak.plugins.segment.standby.codec.SegmentReply;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader;
import org.apache.jackrabbit.oak.plugins.segment.standby.store.StandbyStore;
import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/plugins/segment/standby/client/SegmentLoaderHandler.class */
public class SegmentLoaderHandler extends ChannelInboundHandlerAdapter implements RemoteSegmentLoader {
    private static final Logger log = LoggerFactory.getLogger(SegmentLoaderHandler.class);
    private final StandbyStore store;
    private final String clientID;
    private final RecordId head;
    private final EventExecutorGroup preloaderExecutor;
    private final EventExecutorGroup loaderExecutor;
    private ChannelHandlerContext ctx;
    private int timeoutMs = 5000;
    final BlockingQueue<Segment> segment = new LinkedBlockingQueue();

    public SegmentLoaderHandler(StandbyStore standbyStore, RecordId recordId, EventExecutorGroup eventExecutorGroup, EventExecutorGroup eventExecutorGroup2, String str) {
        this.store = standbyStore;
        this.head = recordId;
        this.preloaderExecutor = eventExecutorGroup;
        this.loaderExecutor = eventExecutorGroup2;
        this.clientID = str;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) {
        this.ctx = channelHandlerContext;
        initSync();
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof SegmentReply) {
            this.segment.offer(((SegmentReply) obj).getSegment());
        }
    }

    private void initSync() {
        log.debug("new head id " + this.head);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.store.setLoader(this);
            SegmentNodeState head = this.store.getHead();
            SegmentNodeBuilder builder = head.builder();
            SegmentNodeState segmentNodeState = new SegmentNodeState(this.head);
            while (true) {
                try {
                    segmentNodeState.compareAgainstBaseState(head, new ApplyDiff(builder));
                    log.debug("updated head state successfully: {} in {}ms.", Boolean.valueOf(this.store.setHead(head, builder.getNodeState())), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    close();
                    return;
                } catch (SegmentNotFoundException e) {
                    String segmentId = e.getSegmentId();
                    Segment readSegment = readSegment(e.getSegmentId());
                    if (readSegment == null) {
                        log.warn("can't read locally corrupt segment " + segmentId + " from primary");
                        throw e;
                    }
                    log.debug("did reread locally corrupt segment " + segmentId + " with size " + readSegment.size());
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(readSegment.size());
                    try {
                        readSegment.writeTo(byteArrayOutputStream);
                        this.store.writeSegment(readSegment.getSegmentId(), byteArrayOutputStream.toByteArray(), 0, readSegment.size());
                    } catch (IOException e2) {
                        log.error("can't wrap segment to output stream", e2);
                        throw e;
                    }
                }
            }
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    @Override // org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader
    public Segment readSegment(String str) {
        this.ctx.writeAndFlush(Messages.newGetSegmentReq(this.clientID, str));
        return getSegment();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        log.warn("Closing channel. Got exception: " + th);
        channelHandlerContext.close();
    }

    public Segment getSegment() {
        Segment poll;
        boolean z = false;
        while (true) {
            try {
                poll = this.segment.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
                break;
            } catch (InterruptedException e) {
                z = true;
            } catch (Throwable th) {
                if (z) {
                    Thread.currentThread().interrupt();
                }
                throw th;
            }
        }
        if (z) {
            Thread.currentThread().interrupt();
        }
        return poll;
    }

    @Override // org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader
    public void close() {
        this.ctx.close();
        if (this.preloaderExecutor != null && !this.preloaderExecutor.isShuttingDown()) {
            this.preloaderExecutor.shutdownGracefully(1L, 2L, TimeUnit.SECONDS).syncUninterruptibly();
        }
        if (this.loaderExecutor == null || this.loaderExecutor.isShuttingDown()) {
            return;
        }
        this.loaderExecutor.shutdownGracefully(1L, 2L, TimeUnit.SECONDS).syncUninterruptibly();
    }

    @Override // org.apache.jackrabbit.oak.plugins.segment.standby.store.RemoteSegmentLoader
    public boolean isClosed() {
        return this.loaderExecutor != null && (this.loaderExecutor.isShuttingDown() || this.loaderExecutor.isShutdown());
    }
}
