package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.ControlEventHandler;
import com.couchbase.client.dcp.core.event.EventBus;
import com.couchbase.client.dcp.events.StreamEndEvent;
import com.couchbase.client.dcp.message.DcpStreamEndMessage;
import com.couchbase.client.dcp.message.StreamEndReason;
import com.couchbase.client.dcp.transport.netty.ChannelFlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/couchbase/client/dcp/conductor/DcpChannelControlHandler.class */
public class DcpChannelControlHandler implements ControlEventHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DcpChannelControlHandler.class);
    private final DcpChannel dcpChannel;
    private final ControlEventHandler controlEventHandler;
    private final EventBus eventBus;

    public DcpChannelControlHandler(DcpChannel dcpChannel) {
        this.dcpChannel = dcpChannel;
        this.controlEventHandler = dcpChannel.env.controlEventHandler();
        this.eventBus = dcpChannel.env.eventBus();
    }

    @Override // com.couchbase.client.dcp.ControlEventHandler
    public void onEvent(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
        if (DcpStreamEndMessage.is(byteBuf)) {
            filterDcpStreamEndMessage(channelFlowController, byteBuf);
        } else {
            this.controlEventHandler.onEvent(channelFlowController, byteBuf);
        }
    }

    private void filterDcpStreamEndMessage(ChannelFlowController channelFlowController, ByteBuf byteBuf) {
        try {
            int vbucket = DcpStreamEndMessage.vbucket(byteBuf);
            StreamEndReason reason = DcpStreamEndMessage.reason(byteBuf);
            LOGGER.debug("Server closed Stream on vbid {} with reason {}", Integer.valueOf(vbucket), reason);
            StreamEndEvent streamEndEvent = new StreamEndEvent(vbucket, reason);
            if (this.dcpChannel.env.persistencePollingEnabled()) {
                this.dcpChannel.env.streamEventBuffer().onStreamEnd(streamEndEvent);
            } else {
                this.eventBus.publish(streamEndEvent);
            }
            this.dcpChannel.streamIsOpen.set(vbucket, false);
            if (reason != StreamEndReason.OK) {
                this.dcpChannel.conductor.maybeMovePartition(vbucket);
            }
            try {
                channelFlowController.ack(byteBuf);
                byteBuf.release();
            } finally {
            }
        } catch (Throwable th) {
            try {
                channelFlowController.ack(byteBuf);
                byteBuf.release();
                throw th;
            } finally {
            }
        }
    }
}
