package io.confluent.parallelconsumer.vertx;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.ExternalEngine;
import io.confluent.parallelconsumer.internal.UserFunctions;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/parallelconsumer/vertx/VertxParallelEoSStreamProcessor.class */
public class VertxParallelEoSStreamProcessor<K, V> extends ExternalEngine<K, V> implements VertxParallelStreamProcessor<K, V> {
    private static final Logger log = LoggerFactory.getLogger(VertxParallelEoSStreamProcessor.class);
    private static final String VERTX_TYPE = "vert.x-type";
    private final Vertx vertx;
    private final WebClient webClient;
    private Optional<Runnable> onVertxCompleteHook;

    /* loaded from: input_file:io/confluent/parallelconsumer/vertx/VertxParallelEoSStreamProcessor$RequestInfo.class */
    public static class RequestInfo {
        public static final int DEFAULT_PORT = 8080;
        private final String host;
        private final int port;
        private final String contextPath;
        private Map<String, String> params;

        public RequestInfo(String str, String str2, Map<String, String> map) {
            this(str, DEFAULT_PORT, str2, map);
        }

        public RequestInfo(String str, String str2) {
            this(str, DEFAULT_PORT, str2, UniMaps.of());
        }

        public void setParams(Map<String, String> map) {
            this.params = map;
        }

        public String getHost() {
            return this.host;
        }

        public int getPort() {
            return this.port;
        }

        public String getContextPath() {
            return this.contextPath;
        }

        public Map<String, String> getParams() {
            return this.params;
        }

        public RequestInfo(String str, int i, String str2, Map<String, String> map) {
            this.host = str;
            this.port = i;
            this.contextPath = str2;
            this.params = map;
        }
    }

    public VertxParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsumerOptions) {
        this(Vertx.vertx(), null, parallelConsumerOptions);
    }

    public VertxParallelEoSStreamProcessor(Vertx vertx, WebClient webClient, ParallelConsumerOptions parallelConsumerOptions) {
        super(parallelConsumerOptions);
        this.onVertxCompleteHook = Optional.empty();
        VertxOptions workerPoolSize = new VertxOptions().setWorkerPoolSize(Runtime.getRuntime().availableProcessors());
        int maxConcurrency = parallelConsumerOptions.getMaxConcurrency();
        WebClientOptions http2MaxPoolSize = new WebClientOptions().setMaxPoolSize(maxConcurrency).setHttp2MaxPoolSize(maxConcurrency);
        vertx = vertx == null ? Vertx.vertx(workerPoolSize) : vertx;
        this.vertx = vertx;
        this.webClient = webClient == null ? WebClient.create(vertx, http2MaxPoolSize) : webClient;
    }

    protected ThreadPoolExecutor setupWorkerPool(int i) {
        return super.setupWorkerPool(1);
    }

    @Override // io.confluent.parallelconsumer.vertx.VertxParallelStreamProcessor
    public void vertxHttpReqInfo(Function<PollContext<K, V>, RequestInfo> function, Consumer<Future<HttpResponse<Buffer>>> consumer, Consumer<AsyncResult<HttpResponse<Buffer>>> consumer2) {
        vertxHttpRequest((webClient, pollContext) -> {
            RequestInfo requestInfo = (RequestInfo) UserFunctions.carefullyRun(function, pollContext);
            HttpRequest httpRequest = webClient.get(requestInfo.getPort(), requestInfo.getHost(), requestInfo.getContextPath());
            for (Map.Entry<String, String> entry : requestInfo.getParams().entrySet()) {
                httpRequest = httpRequest.addQueryParam(entry.getKey(), entry.getValue());
            }
            return httpRequest;
        }, consumer, consumer2);
    }

    @Override // io.confluent.parallelconsumer.vertx.VertxParallelStreamProcessor
    public void vertxHttpRequest(BiFunction<WebClient, PollContext<K, V>, HttpRequest<Buffer>> biFunction, Consumer<Future<HttpResponse<Buffer>>> consumer, Consumer<AsyncResult<HttpResponse<Buffer>>> consumer2) {
        vertxHttpWebClient((webClient, pollContext) -> {
            Future send = ((HttpRequest) UserFunctions.carefullyRun(biFunction, webClient, pollContext)).send();
            send.onComplete(asyncResult -> {
                consumer2.accept(asyncResult);
            });
            return send;
        }, consumer);
    }

    @Override // io.confluent.parallelconsumer.vertx.VertxParallelStreamProcessor
    public void vertxHttpWebClient(BiFunction<WebClient, PollContext<K, V>, Future<HttpResponse<Buffer>>> biFunction, Consumer<Future<HttpResponse<Buffer>>> consumer) {
        super.supervisorLoop(pollContextInternal -> {
            log.trace("Consumed a record ({}), executing void function...", pollContextInternal);
            Future<?> future = (Future) UserFunctions.carefullyRun(biFunction, this.webClient, pollContextInternal.getPollContext());
            consumer.accept(future);
            addVertxHooks(pollContextInternal, future);
            return UniLists.of(future);
        }, future -> {
        });
    }

    private void addVertxHooks(PollContextInternal<K, V> pollContextInternal, Future<?> future) {
        pollContextInternal.streamWorkContainers().forEach(workContainer -> {
            workContainer.setWorkType(VERTX_TYPE);
            future.onSuccess(obj -> {
                log.debug("Vert.x Vertical success");
                workContainer.onUserFunctionSuccess();
                addToMailbox(workContainer);
            });
            future.onFailure(th -> {
                log.error("Vert.x Vertical fail: {}", th.getMessage());
                workContainer.onUserFunctionFailure(th);
                addToMailbox(workContainer);
            });
            future.onComplete(asyncResult -> {
                log.trace("Running plugin hook");
                this.onVertxCompleteHook.ifPresent((v0) -> {
                    v0.run();
                });
            });
        });
    }

    @Override // io.confluent.parallelconsumer.vertx.VertxParallelStreamProcessor
    public void vertxFuture(Function<PollContext<K, V>, Future<?>> function) {
        super.supervisorLoop(pollContextInternal -> {
            log.trace("Consumed a record ({}), executing void function...", pollContextInternal);
            Future<?> future = (Future) UserFunctions.carefullyRun(function, pollContextInternal.getPollContext());
            addVertxHooks(pollContextInternal, future);
            return UniLists.of(future);
        }, future -> {
        });
    }

    @Override // io.confluent.parallelconsumer.vertx.VertxParallelStreamProcessor
    public void batchVertxFuture(Function<PollContext<K, V>, Future<?>> function) {
        super.supervisorLoop(pollContextInternal -> {
            Future<?> future = (Future) UserFunctions.carefullyRun(function, pollContextInternal.getPollContext());
            addVertxHooks(pollContextInternal, future);
            return UniLists.of(future);
        }, future -> {
        });
    }

    public void addVertxOnCompleteHook(Runnable runnable) {
        this.onVertxCompleteHook = Optional.of(runnable);
    }

    protected void onUserFunctionSuccess(WorkContainer<K, V> workContainer, List<?> list) {
        if (isAsyncFutureWork(list)) {
            log.debug("Vertx creation function success, user's function success");
        } else {
            super.onUserFunctionSuccess(workContainer, list);
        }
    }

    protected void addToMailBoxOnUserFunctionSuccess(WorkContainer<K, V> workContainer, List<?> list) {
        if (isAsyncFutureWork(list)) {
            log.debug("User function success but not adding vertx vertical to mailbox yet");
        } else {
            super.addToMailBoxOnUserFunctionSuccess(workContainer, list);
        }
    }

    protected boolean isAsyncFutureWork(List<?> list) {
        Iterator<?> it = list.iterator();
        if (it.hasNext()) {
            return it.next() instanceof Future;
        }
        return false;
    }

    public void close(Duration duration, DrainingCloseable.DrainingMode drainingMode) {
        log.info("Vert.x async consumer closing...");
        super.close(duration, drainingMode);
        this.webClient.close();
        Future close = this.vertx.close();
        Timer timer = Time.SYSTEM.timer(duration);
        while (!close.isComplete()) {
            log.trace("Waiting on close to complete");
            Thread.sleep(100L);
            timer.update();
            if (timer.isExpired()) {
                throw new TimeoutException("Waiting for system to close");
            }
        }
    }
}
