/*
 * Decompiled with CFR 0.152.
 */
package nakadi;

import java.io.BufferedReader;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import nakadi.ExceptionSupport;
import nakadi.ExecutorServiceSupport;
import nakadi.ExponentialRetry;
import nakadi.JsonBatchSupport;
import nakadi.LoggingStreamOffsetObserver;
import nakadi.MetricCollector;
import nakadi.NakadiClient;
import nakadi.NakadiException;
import nakadi.NonRetryableNakadiException;
import nakadi.Problem;
import nakadi.Response;
import nakadi.ResponseSupport;
import nakadi.StreamBatchRecord;
import nakadi.StreamBatchRecordBufferingSubscriber;
import nakadi.StreamBatchRecordSubscriber;
import nakadi.StreamConfiguration;
import nakadi.StreamConnectionRestart;
import nakadi.StreamConnectionRetryFlowable;
import nakadi.StreamObserver;
import nakadi.StreamObserverProvider;
import nakadi.StreamOffsetObserver;
import nakadi.StreamProcessorManaged;
import nakadi.StreamProcessorRequestFactory;
import nakadi.Subscription;
import nakadi.SubscriptionOffsetCheckpointer;
import nakadi.SubscriptionOffsetObserver;
import nakadi.TypeLiteral;
import nakadi.Unstable;
import nakadi.VisibleForTesting;
import nakadi.shadow.com.google.common.util.concurrent.ThreadFactoryBuilder;
import nakadi.shadow.io.reactivex.Flowable;
import nakadi.shadow.io.reactivex.FlowableTransformer;
import nakadi.shadow.io.reactivex.Scheduler;
import nakadi.shadow.io.reactivex.exceptions.UndeliverableException;
import nakadi.shadow.io.reactivex.functions.Consumer;
import nakadi.shadow.io.reactivex.functions.Function;
import nakadi.shadow.io.reactivex.functions.Predicate;
import nakadi.shadow.io.reactivex.plugins.RxJavaPlugins;
import nakadi.shadow.io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamProcessor
implements StreamProcessorManaged {
    private static final Logger logger = LoggerFactory.getLogger((String)NakadiClient.class.getSimpleName());
    static final String X_NAKADI_STREAM_ID = "X-Nakadi-StreamId";
    private static final int DEFAULT_HALF_OPEN_CONNECTION_GRACE_SECONDS = 90;
    static final int DEFAULT_BACKPRESSURE_BUFFER_SIZE = 128;
    private static final int START_AWAIT_TIMEOUT_SECONDS = 63;
    private static final TimeUnit START_AWAIT_TIMEOUT_UNIT = TimeUnit.SECONDS;
    private final NakadiClient client;
    private final StreamConfiguration streamConfiguration;
    private final StreamObserverProvider streamObserverProvider;
    private final StreamOffsetObserver streamOffsetObserver;
    private final JsonBatchSupport jsonBatchSupport;
    private final long maxRetryDelay;
    private final int maxRetryAttempts;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final AtomicBoolean stopping = new AtomicBoolean(false);
    private final AtomicBoolean retryAttemptsFinished = new AtomicBoolean(false);
    private final CountDownLatch startLatch;
    private final StreamProcessorRequestFactory streamProcessorRequestFactory;
    private final int batchBufferCount;
    private volatile Throwable failedProcessorException;
    private StreamBatchRecordSubscriber subscriber;
    private final ExecutorService monoIoExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("nakadi-java-io-%d").setUncaughtExceptionHandler((t, e) -> this.handleUncaught(t, e, "stream_processor_err_io")).build());
    private final Scheduler monoIoScheduler = Schedulers.from(this.monoIoExecutor);
    private final ExecutorService monoComputeExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("nakadi-java-compute-%d").setUncaughtExceptionHandler((t, e) -> this.handleUncaught(t, e, "stream_processor_err_compute")).build());
    private final Scheduler monoComputeScheduler = Schedulers.from(this.monoComputeExecutor);
    private volatile StreamObserver streamObserver;
    private volatile int currentStreamResponseCode;

    @VisibleForTesting
    StreamProcessor(NakadiClient client, StreamProcessorRequestFactory streamProcessorRequestFactory) {
        this.client = client;
        this.streamConfiguration = null;
        this.streamObserverProvider = null;
        this.streamOffsetObserver = null;
        this.jsonBatchSupport = new JsonBatchSupport(client.jsonSupport());
        this.maxRetryDelay = StreamConnectionRetryFlowable.DEFAULT_MAX_DELAY_SECONDS;
        this.maxRetryAttempts = StreamConnectionRetryFlowable.DEFAULT_MAX_ATTEMPTS;
        this.batchBufferCount = 128;
        this.startLatch = new CountDownLatch(1);
        this.streamProcessorRequestFactory = streamProcessorRequestFactory;
    }

    private StreamProcessor(Builder builder) {
        this.streamConfiguration = builder.streamConfiguration;
        this.client = builder.client;
        this.streamObserverProvider = builder.streamObserverProvider;
        this.streamOffsetObserver = builder.streamOffsetObserver;
        this.jsonBatchSupport = new JsonBatchSupport(this.client.jsonSupport());
        this.maxRetryDelay = this.streamConfiguration.maxRetryDelaySeconds();
        this.maxRetryAttempts = this.streamConfiguration.maxRetryAttempts();
        this.batchBufferCount = this.streamConfiguration.batchBufferCount();
        this.startLatch = new CountDownLatch(1);
        this.streamProcessorRequestFactory = builder.streamProcessorRequestFactory;
    }

    public static Builder newBuilder(NakadiClient client) {
        return new Builder().client(client);
    }

    @Override
    public void start() throws IllegalStateException {
        if (this.inShutdown()) {
            throw new IllegalStateException("processor has already been stopped and cannot be restarted");
        }
        this.setupRxErrorHandler();
        if (!this.started.getAndSet(true)) {
            this.startStreaming();
        }
        this.waitingOnStart();
    }

    @Override
    public void stop() {
        if (this.inShutdown()) {
            logger.debug("stream_processor op=stop msg=stopping_processor_already_requested_short_circuiting");
            return;
        }
        logger.info("stream_processor op=stop msg=stopping_processor_requested");
        if (this.startLatch.getCount() == 1L) {
            logger.warn("stream_processor stop called before start completed");
        }
        if (this.started.getAndSet(false)) {
            this.stopStreaming();
        }
    }

    @Override
    public boolean running() {
        return !this.stopping() && !this.stopped() && this.started();
    }

    public Optional<Throwable> failedProcessorException() {
        return Optional.ofNullable(this.failedProcessorException);
    }

    @Override
    public void retryAttemptsFinished(boolean completed) {
        this.retryAttemptsFinished.getAndSet(completed);
    }

    @Override
    public void failedProcessorException(Throwable failedProcessorException) {
        this.failedProcessorException = failedProcessorException;
    }

    StreamProcessor currentStreamResponseCode(int currentStreamResponseCode) {
        this.currentStreamResponseCode = currentStreamResponseCode;
        return this;
    }

    boolean stopped() {
        return this.stopped.get();
    }

    boolean started() {
        return this.started.get();
    }

    private boolean stopping() {
        return this.stopped.get() || this.stopping.get();
    }

    private boolean inShutdown() {
        return this.stopped() || this.stopping();
    }

    private void startStreaming() {
        this.stream(this.streamConfiguration, this.streamObserverProvider);
        this.startLatch.countDown();
    }

    private void waitingOnStart() {
        logger.info("stream_processor op=waiting_on_start startup_wait_time={}s", (Object)63);
        try {
            boolean await = this.startLatch.await(63L, START_AWAIT_TIMEOUT_UNIT);
            logger.info("stream_processor op=has_started startup_within_allowed_time=" + await);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void stopStreaming() {
        this.stopping.getAndSet(true);
        if (this.failedProcessorException != null) {
            logger.warn("op=stream_processor_stop msg=failed_processor_exception_detected type={} err={}", (Object)this.failedProcessorException.getClass().getSimpleName(), (Object)this.failedProcessorException.getMessage());
        }
        if (this.streamObserver != null) {
            this.streamObserver.onStop();
        }
        this.subscriber.dispose();
        logger.debug("op=stream_processor_stop msg=stopping_executor name=monoIoScheduler");
        ExecutorServiceSupport.shutdown(this.monoIoExecutor);
        logger.debug("op=stream_processor_stop msg=stopping_executor name=monoComputeScheduler");
        ExecutorServiceSupport.shutdown(this.monoComputeExecutor);
        this.stopped.getAndSet(true);
    }

    private <T> void stream(StreamConfiguration sc, StreamObserverProvider<T> provider) {
        StreamObserver<T> observer;
        this.streamObserver = observer = provider.createStreamObserver();
        TypeLiteral<T> literal = provider.typeLiteral();
        Flowable<StreamBatchRecord<T>> observable = this.buildObservable(observer, sc, literal);
        Optional<Integer> maybeBuffering = observer.requestBuffer();
        if (maybeBuffering.isPresent()) {
            logger.info("op=create_subscriber type=buffering buffer={} config={}", (Object)sc);
            observable.observeOn(this.monoComputeScheduler).buffer(maybeBuffering.get()).subscribeWith(new StreamBatchRecordBufferingSubscriber<T>(observer, this.client.metricCollector()));
        } else {
            logger.info("op=create_subscriber type=regular config={}", (Object)sc);
            this.subscriber = new StreamBatchRecordSubscriber<T>(observer, this.client.metricCollector());
            observable.observeOn(this.monoComputeScheduler).subscribeWith(new StreamBatchRecordSubscriber<T>(observer, this.client.metricCollector()));
        }
    }

    private <T> Flowable<StreamBatchRecord<T>> buildObservable(StreamObserver<T> streamObserver, StreamConfiguration streamConfiguration, TypeLiteral<T> typeLiteral) {
        TimeUnit halfOpenUnit = TimeUnit.SECONDS;
        long halfOpenGrace = 90L;
        long batchFlushTimeoutSeconds = this.streamConfiguration.batchFlushTimeoutSeconds();
        long halfOpenKick = halfOpenUnit.toSeconds(batchFlushTimeoutSeconds + halfOpenGrace);
        logger.info("op=processor_configure, batch_flush_timeout={}, grace_period={}, kick_after={}{}", new Object[]{batchFlushTimeoutSeconds, halfOpenGrace, halfOpenKick, halfOpenUnit.name().toLowerCase()});
        Flowable flowable = Flowable.using(this.httpRequestFactory(streamConfiguration), this.streamConsumerFactory(typeLiteral, streamConfiguration), this.httpResponseDispose()).subscribeOn(this.monoIoScheduler).unsubscribeOn(this.monoIoScheduler).doOnSubscribe(subscription -> {
            if (this.inShutdown()) {
                logger.trace("stream_retry_not_resubscribing msg=stream processor is stopped");
                return;
            }
            streamObserver.onStart();
        }).doOnComplete(() -> {
            streamObserver.onCompleted();
            if (this.successfulResponseAndCustomStreamTimeout()) {
                logger.info("op=stop_processor_on_complete msg=stopping_assuming_server_closed_ok_stream_due_to_stream_timeout stream_timeout={}  server_response={}", (Object)streamConfiguration.streamTimeoutSeconds(), (Object)this.currentStreamResponseCode);
                this.stopStreaming();
            }
        }).doOnCancel(() -> {
            if (this.retryAttemptsFinished.get()) {
                logger.info("op=stop_processor msg=stopping_after_stream_retry_finished");
                this.stopStreaming();
            }
            if (this.successfulResponseAndCustomStreamTimeout()) {
                logger.info("op=stop_processor_on_cancel msg=stopping_assuming_server_closed_ok_stream_due_to_stream_timeout stream_timeout={}  server_response={}", (Object)streamConfiguration.streamTimeoutSeconds(), (Object)this.currentStreamResponseCode);
                streamObserver.onCompleted();
                this.stopStreaming();
            }
        }).timeout(halfOpenKick, halfOpenUnit).retryWhen(this.buildStreamConnectionRetryFlowable()).compose(this.buildRestartHandler());
        return Flowable.defer(() -> flowable);
    }

    private boolean successfulResponseAndCustomStreamTimeout() {
        return this.currentStreamResponseCode == 200 && this.hasCustomStreamTimeout();
    }

    @VisibleForTesting
    Callable<Response> httpRequestFactory(StreamConfiguration sc) {
        return this.streamProcessorRequestFactory.createCallable(sc, this);
    }

    private <T> Function<? super Response, Flowable<StreamBatchRecord<T>>> streamConsumerFactory(TypeLiteral<T> literal, StreamConfiguration sc) {
        return response -> {
            BufferedReader br = new BufferedReader(response.responseBody().asReader());
            return Flowable.fromIterable(br.lines()::iterator).doOnError(throwable -> ResponseSupport.closeQuietly(response)).map(r -> this.lineToStreamBatchRecord((String)r, literal, (Response)response, sc));
        };
    }

    private Consumer<? super Response> httpResponseDispose() {
        return ResponseSupport::closeQuietly;
    }

    private StreamConnectionRetryFlowable buildStreamConnectionRetryFlowable() {
        ExponentialRetry exponentialRetry = ExponentialRetry.newBuilder().initialInterval(StreamConnectionRetryFlowable.DEFAULT_INITIAL_DELAY_SECONDS, StreamConnectionRetryFlowable.DEFAULT_TIME_UNIT).maxInterval(this.maxRetryDelay, StreamConnectionRetryFlowable.DEFAULT_TIME_UNIT).maxAttempts(this.maxRetryAttempts).build();
        return new StreamConnectionRetryFlowable(exponentialRetry, this.buildRetryFunction(), this.client.metricCollector(), this);
    }

    private <T> FlowableTransformer<StreamBatchRecord<T>, StreamBatchRecord<T>> buildRestartHandler() {
        return new StreamConnectionRestart().repeatWhenWithDelayAndUntil(this.stopRepeatingPredicate(), StreamConnectionRestart.DEFAULT_DELAY_SECONDS, StreamConnectionRestart.DEFAULT_DELAY_UNIT, StreamConnectionRestart.DEFAULT_MAX_RESTARTS);
    }

    private Function<Throwable, Boolean> buildRetryFunction() {
        return ExceptionSupport::isConsumerStreamRetryable;
    }

    private Predicate<Long> stopRepeatingPredicate() {
        return attemptCount -> {
            if (this.streamConfiguration.streamLimit() != 0) {
                logger.debug("op=repeater msg=will not continue to restart, request for a bounded number of events detected stream_limit={} restarts={}", (Object)this.streamConfiguration.streamLimit(), attemptCount);
                return true;
            }
            this.client.metricCollector().mark(MetricCollector.Meter.streamRestart);
            return false;
        };
    }

    private boolean hasCustomStreamTimeout() {
        return this.streamConfiguration.streamTimeoutSeconds() != 0L;
    }

    private <T> StreamBatchRecord<T> lineToStreamBatchRecord(String line, TypeLiteral<T> typeLiteral, Response response, StreamConfiguration sc) {
        if (sc.isSubscriptionStream()) {
            String sessionId = response.headers().get(X_NAKADI_STREAM_ID).get(0);
            logger.debug("op=line_to_batch x_nakadi_stream_id={} line={}, response={}", new Object[]{sessionId, line, response});
            return this.jsonBatchSupport.lineToSubscriptionStreamBatchRecord(line, typeLiteral.type(), this.streamOffsetObserver(), sessionId, sc.subscriptionId());
        }
        logger.debug("op=line_to_batch line={}, response={}", (Object)line, (Object)response);
        return this.jsonBatchSupport.lineToEventStreamBatchRecord(line, typeLiteral.type(), this.streamOffsetObserver());
    }

    private void setupRxErrorHandler() {
        RxJavaPlugins.setErrorHandler(t -> {
            Throwable t0 = t;
            if (t instanceof UndeliverableException) {
                t0 = t.getCause();
            }
            if (this.failedProcessorException == null) {
                this.failedProcessorException = t0;
            }
            if (t0 instanceof RejectedExecutionException) {
                logger.debug("op=unhandled_rejected_execution action=continue {}", (Object)t0.getMessage());
            } else if (t0 instanceof NonRetryableNakadiException) {
                logger.error(String.format("op=unhandled_non_retryable_exception action=stopping type=NonRetryableNakadiException %s ", ((NonRetryableNakadiException)t0).problem()), t0);
                this.stopStreaming();
            } else if (t0 instanceof Error) {
                logger.error(String.format("op=unhandled_error action=stopping type=NonRetryableNakadiException %s ", t.getMessage()), t0);
                this.stopStreaming();
            } else {
                logger.error(String.format("unhandled_unknown_exception action=stopping type=%s %s", t0.getClass().getSimpleName(), t0.getMessage()), t0);
                this.stopStreaming();
            }
        });
    }

    private void handleUncaught(Thread t, Throwable e, String name) {
        if (ExceptionSupport.isInterruptedIOException(e)) {
            Thread.currentThread().interrupt();
            logger.warn(String.format("op=handle_exception action=interrupt_and_continue type=InterruptedIOException %s %s", name, t), e);
        } else {
            if (e instanceof NonRetryableNakadiException) {
                logger.error(String.format("op=handle_exception action=stopping type=NonRetryableNakadiException %s %s %s", name, t, ((NonRetryableNakadiException)e).problem()), e);
            } else {
                logger.error(String.format("op=handle_exception action=stopping type=%s %s %s", e.getClass().getSimpleName(), name, t), e);
            }
            this.failedProcessorException = e;
            this.stopStreaming();
        }
    }

    private int onBackPressureBufferSize() {
        return this.batchBufferCount;
    }

    @VisibleForTesting
    StreamOffsetObserver streamOffsetObserver() {
        return this.streamOffsetObserver;
    }

    @VisibleForTesting
    String findEventTypeNameForSubscription(StreamConfiguration sc) {
        Subscription sub = this.client.resources().subscriptions().find(sc.subscriptionId());
        return sub.eventTypes().get(0);
    }

    public static class Builder {
        private NakadiClient client;
        private StreamObserverProvider streamObserverProvider;
        private SubscriptionOffsetCheckpointer checkpointer;
        private StreamOffsetObserver streamOffsetObserver;
        private StreamConfiguration streamConfiguration;
        private StreamProcessorRequestFactory streamProcessorRequestFactory;

        public StreamProcessor build() {
            NakadiException.throwNonNull(this.streamConfiguration, "Please provide a stream configuration");
            if (this.streamConfiguration.isSubscriptionStream() && this.streamConfiguration.isEventTypeStream()) {
                throw new NakadiException(Problem.localProblem("Cannot be configured with both a subscriptionId and an eventTypeName", String.format("subscriptionId=%s eventTypeName=%s", this.streamConfiguration.subscriptionId(), this.streamConfiguration.eventTypeName())));
            }
            if (!this.streamConfiguration.isSubscriptionStream() && !this.streamConfiguration.isEventTypeStream()) {
                throw new NakadiException(Problem.localProblem("Please supply either a subscription id or an event type", ""));
            }
            NakadiException.throwNonNull(this.client, "Please provide a client");
            NakadiException.throwNonNull(this.streamObserverProvider, "Please provide a StreamObserverProvider");
            if (this.streamConfiguration.isSubscriptionStream() && this.streamOffsetObserver == null) {
                if (this.checkpointer == null) {
                    this.checkpointer = new SubscriptionOffsetCheckpointer(this.client).suppressInvalidSessionException(false);
                }
                this.streamOffsetObserver = new SubscriptionOffsetObserver(this.checkpointer);
            }
            if (this.streamConfiguration.isEventTypeStream() && this.streamOffsetObserver == null) {
                this.streamOffsetObserver = new LoggingStreamOffsetObserver();
            }
            if (this.streamProcessorRequestFactory == null) {
                this.streamProcessorRequestFactory = new StreamProcessorRequestFactory(this.client);
            }
            return new StreamProcessor(this);
        }

        public Builder client(NakadiClient client) {
            this.client = client;
            return this;
        }

        @Deprecated
        public Builder scope(String scope) {
            return this;
        }

        public Builder streamObserverFactory(StreamObserverProvider streamObserverProvider) {
            this.streamObserverProvider = streamObserverProvider;
            return this;
        }

        public Builder streamOffsetObserver(StreamOffsetObserver streamOffsetObserver) {
            this.streamOffsetObserver = streamOffsetObserver;
            return this;
        }

        public Builder streamConfiguration(StreamConfiguration streamConfiguration) {
            this.streamConfiguration = streamConfiguration;
            return this;
        }

        @Unstable
        public Builder checkpointer(SubscriptionOffsetCheckpointer checkpointer) {
            this.checkpointer = checkpointer;
            return this;
        }

        @VisibleForTesting
        Builder streamProcessorRequestFactory(StreamProcessorRequestFactory factory) {
            this.streamProcessorRequestFactory = factory;
            return this;
        }
    }
}

