package org.apache.inlong.sort.standalone.source.sortsdk;

import com.google.common.base.Preconditions;
import java.util.List;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.inlong.sdk.sort.api.ReadCallback;
import org.apache.inlong.sdk.sort.api.SortClient;
import org.apache.inlong.sdk.sort.entity.InLongMessage;
import org.apache.inlong.sdk.sort.entity.MessageRecord;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceContext;
import org.apache.inlong.sort.standalone.source.sortsdk.SubscribeFetchResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback.class */
public class FetchCallback implements ReadCallback {
    private static final Logger LOG = LoggerFactory.getLogger(FetchCallback.class);
    private final String sortId;
    private final ChannelProcessor channelProcessor;
    private final SortSdkSourceContext context;
    private SortClient client;

    /* loaded from: input_file:org/apache/inlong/sort/standalone/source/sortsdk/FetchCallback$Factory.class */
    public static class Factory {
        public static FetchCallback create(@NotBlank(message = "sortId should not be null or empty.") String str, @NotNull(message = "channelProcessor should not be null.") ChannelProcessor channelProcessor, @NotNull(message = "context should not be null") SortSdkSourceContext sortSdkSourceContext) {
            return new FetchCallback(str, channelProcessor, sortSdkSourceContext);
        }
    }

    private FetchCallback(String str, ChannelProcessor channelProcessor, SortSdkSourceContext sortSdkSourceContext) {
        this.sortId = str;
        this.channelProcessor = channelProcessor;
        this.context = sortSdkSourceContext;
    }

    public void setClient(@NotNull SortClient sortClient) {
        this.client = sortClient;
    }

    public void onFinished(MessageRecord messageRecord) {
        try {
            Preconditions.checkState(messageRecord != null, "Fetched msg is null.");
            for (InLongMessage inLongMessage : messageRecord.getMsgs()) {
                SubscribeFetchResult create = SubscribeFetchResult.Factory.create(this.sortId, messageRecord.getMsgKey(), messageRecord.getOffset(), inLongMessage.getMsgHeader(), messageRecord.getRecTime(), inLongMessage.getData());
                ProfileEvent profileEvent = new ProfileEvent(create.getBody(), create.getHeaders());
                this.channelProcessor.processEvent(profileEvent);
                this.context.reportToMetric(profileEvent, this.sortId, "-", SortSdkSourceContext.FetchResult.SUCCESS);
            }
            this.client.ack(messageRecord.getMsgKey(), messageRecord.getMsgKey());
        } catch (NullPointerException e) {
            LOG.error("Got a null pointer exception for sortId " + this.sortId, e);
            this.context.reportToMetric((ProfileEvent) null, this.sortId, "-", SortSdkSourceContext.FetchResult.FAILURE);
        } catch (Exception e2) {
            LOG.error("Ack failed for sortId " + this.sortId, e2);
        }
    }

    public void onFinishedBatch(List<MessageRecord> list) {
    }
}
