/*
 * Decompiled with CFR 0.152.
 */
package nakadi;

import java.util.concurrent.TimeUnit;
import nakadi.MetricCollector;
import nakadi.NakadiClient;
import nakadi.RetryPolicy;
import nakadi.StreamProcessorManaged;
import nakadi.shadow.io.reactivex.Flowable;
import nakadi.shadow.io.reactivex.functions.Function;
import nakadi.shadow.org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamConnectionRetryFlowable
implements Function<Flowable<? extends Throwable>, Publisher<Object>> {
    private static final Logger logger = LoggerFactory.getLogger((String)NakadiClient.class.getSimpleName());
    static int DEFAULT_INITIAL_DELAY_SECONDS = 1;
    static int DEFAULT_MAX_DELAY_SECONDS = 8;
    static int DEFAULT_MIN_DELAY_SECONDS = 1;
    static int DEFAULT_MAX_ATTEMPTS = Integer.MAX_VALUE;
    static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
    private final RetryPolicy backoff;
    private final Function<Throwable, Boolean> isRetryable;
    private MetricCollector metricCollector;
    private final StreamProcessorManaged streamProcessor;

    StreamConnectionRetryFlowable(RetryPolicy backoff, Function<Throwable, Boolean> isRetryable, MetricCollector metricCollector, StreamProcessorManaged streamProcessor) {
        this.backoff = backoff;
        this.isRetryable = isRetryable;
        this.metricCollector = metricCollector;
        this.streamProcessor = streamProcessor;
    }

    @Override
    public Publisher<Object> apply(Flowable<? extends Throwable> flowable) throws Exception {
        return flowable.flatMap(throwable -> {
            if (!this.streamProcessor.running()) {
                logger.debug("stream_retry_not_retryable msg=processor_already_disposing dummy_delay=10ms thread={} err={}", (Object)Thread.currentThread().getName(), (Object)throwable.getMessage());
                return Flowable.timer(10L, TimeUnit.MILLISECONDS);
            }
            if (!this.isRetryable.apply((Throwable)throwable).booleanValue()) {
                logger.warn(String.format("stream_retry_not_retryable thread=%s propagating %s, %s", Thread.currentThread().getName(), throwable.getClass().getSimpleName(), throwable.getMessage()));
                return Flowable.error(throwable);
            }
            if (this.backoff.isFinished()) {
                logger.warn(String.format("stream_retry failed after %d attempts, propagating error %s, %s", this.backoff.workingAttempts(), throwable.getClass().getSimpleName(), throwable.getMessage()));
                this.streamProcessor.retryAttemptsFinished(true);
                this.streamProcessor.failedProcessorException((Throwable)throwable);
                return Flowable.error(throwable);
            }
            long delay = this.backoff.nextBackoffMillis();
            if (delay == -1L) {
                logger.warn(String.format("stream_retry being stopped after %d attempts, propagating error %s, %s", this.backoff.workingAttempts(), throwable.getClass().getSimpleName(), throwable.getMessage()));
                this.streamProcessor.failedProcessorException((Throwable)throwable);
                this.streamProcessor.retryAttemptsFinished(true);
                return Flowable.error(throwable);
            }
            logger.info("stream_retry_will_sleep sleep={} attempt={}/{} thread={} error={}", new Object[]{delay, this.backoff.workingAttempts(), this.backoff.maxAttempts(), Thread.currentThread().getName(), throwable.getMessage()});
            this.metricCollector.mark(MetricCollector.Meter.consumerRetry);
            return Flowable.timer(delay, TimeUnit.MILLISECONDS);
        });
    }
}

