/*
 * Decompiled with CFR 0.152.
 */
package nakadi;

import java.util.Objects;
import java.util.function.BiConsumer;
import nakadi.Cursor;
import nakadi.CursorCommitResultCollection;
import nakadi.InvalidException;
import nakadi.MetricCollector;
import nakadi.NakadiClient;
import nakadi.NakadiException;
import nakadi.NetworkException;
import nakadi.Problem;
import nakadi.RateLimitException;
import nakadi.StreamCursorContext;
import nakadi.SubscriptionResource;
import nakadi.Unstable;
import nakadi.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Unstable
public class SubscriptionOffsetCheckpointer {
    private static final Logger logger = LoggerFactory.getLogger((String)NakadiClient.class.getSimpleName());
    private final NakadiClient client;
    private volatile boolean suppressInvalidSessionException;
    private volatile boolean suppressNetworkException;
    private BiConsumer<CursorCommitResultCollection, StreamCursorContext> resultCollectionConsumer;

    public SubscriptionOffsetCheckpointer(NakadiClient client) {
        this.client = client;
        this.resultCollectionConsumer = (ccr, context) -> {
            if (ccr.items().isEmpty()) {
                logger.debug("subscription_checkpoint server_accepted_updated_cursor {}", (Object)this.cursorTrackingKey((StreamCursorContext)context));
            } else {
                logger.warn("subscription_checkpoint server_ok_indicated_stale_cursor {}", ccr);
            }
        };
    }

    public SubscriptionOffsetCheckpointer withCursorCommitResultConsumer(BiConsumer<CursorCommitResultCollection, StreamCursorContext> resultCollectionConsumer) {
        NakadiException.throwNonNull(resultCollectionConsumer, "Please provide a non-null result consumer");
        this.resultCollectionConsumer = resultCollectionConsumer;
        return this;
    }

    public SubscriptionOffsetCheckpointer suppressInvalidSessionException(boolean suppressInvalidSessions) {
        this.suppressInvalidSessionException = suppressInvalidSessions;
        return this;
    }

    public SubscriptionOffsetCheckpointer suppressNetworkException(boolean suppressNetworkException) {
        this.suppressNetworkException = suppressNetworkException;
        return this;
    }

    public void checkpoint(StreamCursorContext context) {
        this.checkpointInner(context, this.suppressInvalidSessionException, this.suppressNetworkException);
    }

    private void checkpointInner(StreamCursorContext context, boolean suppressInvalidSessionException, boolean suppressNetworkException) {
        SubscriptionResource resource = this.client.resources().subscriptions();
        try {
            CursorCommitResultCollection ccr = this.checkpointInner(context, resource);
            if (ccr.items().isEmpty()) {
                this.client.metricCollector().mark(MetricCollector.Meter.sessionCheckpointAcceptedCursor, 1L);
            } else {
                this.client.metricCollector().mark(MetricCollector.Meter.sessionCheckpointOkIndicatedStaleCursor, 1L);
            }
            this.resultCollectionConsumer.accept(ccr, context);
        }
        catch (RateLimitException e) {
            logger.warn("subscription_checkpoint_err " + e.getMessage(), (Throwable)e);
            throw e;
        }
        catch (InvalidException e) {
            this.client.metricCollector().mark(MetricCollector.Meter.sessionCheckpointMismatch, 1L);
            if (suppressInvalidSessionException) {
                logger.info("suppressed_invalid_checkpoint_err {}", (Object)e.getMessage());
            }
            throw e;
        }
        catch (NetworkException e) {
            this.client.metricCollector().mark(MetricCollector.Meter.sessionCheckpointNetworkException, 1L);
            if (suppressNetworkException) {
                logger.info("suppressed_network_checkpoint_err {} {}", (Object)this.cursorTrackingKey(context), (Object)e.getMessage());
            }
            throw e;
        }
        catch (NakadiException e) {
            throw e;
        }
        catch (Exception e) {
            throw new NakadiException(Problem.localProblem(e.getMessage(), ""), (Throwable)e);
        }
    }

    @VisibleForTesting
    CursorCommitResultCollection checkpointInner(StreamCursorContext context, SubscriptionResource resource) {
        return resource.checkpoint(context.context(), context.cursor());
    }

    private String cursorTrackingKey(StreamCursorContext context) {
        Cursor cursor = context.cursor();
        if (cursor != null) {
            String partition = cursor.partition();
            String offset = cursor.offset();
            return cursor.eventType().orElse("unknown-event-type") + "-" + partition + "-" + offset;
        }
        logger.warn("unexpected empty cursor {}", (Object)context);
        return context.toString();
    }

    @VisibleForTesting
    boolean suppressInvalidSessionException() {
        return this.suppressInvalidSessionException;
    }

    @VisibleForTesting
    boolean suppressNetworkException() {
        return this.suppressNetworkException;
    }

    public int hashCode() {
        return Objects.hash(this.client, this.suppressInvalidSessionException, this.suppressNetworkException);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        SubscriptionOffsetCheckpointer that = (SubscriptionOffsetCheckpointer)o;
        return this.suppressInvalidSessionException == that.suppressInvalidSessionException && this.suppressNetworkException == that.suppressNetworkException && Objects.equals(this.client, that.client);
    }

    public String toString() {
        return "SubscriptionOffsetCheckpointer{client=" + this.client + ", suppressInvalidSessionException=" + this.suppressInvalidSessionException + ", suppressNetworkException=" + this.suppressNetworkException + '}';
    }
}

