package io.confluent.ksql.api.client.impl;

import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.api.client.ColumnType;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.vertx.core.Context;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:io/confluent/ksql/api/client/impl/StreamedQueryResultImpl.class */
public class StreamedQueryResultImpl extends BufferedPublisher<Row> implements StreamedQueryResult {
    private static final Logger log = LoggerFactory.getLogger(StreamedQueryResultImpl.class);
    private final String queryId;
    private final ImmutableList<String> columnNames;
    private final ImmutableList<ColumnType> columnTypes;
    private final PollableSubscriber pollableSubscriber;
    private volatile boolean polling;
    private boolean subscribing;
    private final AtomicReference<String> continuationToken;
    private final String sql;
    private final Map<String, Object> properties;
    private final ClientImpl client;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamedQueryResultImpl(Context context, String str, List<String> list, List<ColumnType> list2, AtomicReference<String> atomicReference, String str2, Map<String, Object> map, ClientImpl clientImpl) {
        super(context);
        this.queryId = str;
        this.columnNames = ImmutableList.copyOf(list);
        this.columnTypes = ImmutableList.copyOf(list2);
        this.pollableSubscriber = new PollableSubscriber(this.ctx, this::handleErrorWhilePolling);
        this.continuationToken = atomicReference;
        this.sql = str2;
        this.properties = map;
        this.client = clientImpl;
    }

    @Override // io.confluent.ksql.api.client.StreamedQueryResult
    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "columnNames is ImmutableList")
    public List<String> columnNames() {
        return this.columnNames;
    }

    @Override // io.confluent.ksql.api.client.StreamedQueryResult
    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "columnTypes is ImmutableList")
    public List<ColumnType> columnTypes() {
        return this.columnTypes;
    }

    @Override // io.confluent.ksql.api.client.StreamedQueryResult
    public String queryID() {
        return this.queryId;
    }

    public void subscribe(Subscriber<? super Row> subscriber) {
        if (this.polling) {
            throw new IllegalStateException("Cannot set subscriber if polling");
        }
        synchronized (this) {
            this.subscribing = true;
            super.subscribe(subscriber);
        }
    }

    @Override // io.confluent.ksql.api.client.StreamedQueryResult
    public Row poll() {
        return poll(Duration.ZERO);
    }

    @Override // io.confluent.ksql.api.client.StreamedQueryResult
    public Row poll(Duration duration) {
        return poll(duration, null);
    }

    private synchronized Row poll(Duration duration, Runnable runnable) {
        if (this.subscribing) {
            throw new IllegalStateException("Cannot poll if subscriber has been set");
        }
        if (isFailed()) {
            throw new IllegalStateException("Cannot poll on StreamedQueryResult that has failed. Check logs for failure reason.");
        }
        if (runnable != null) {
            runnable.run();
        }
        if (!this.polling) {
            subscribe(this.pollableSubscriber);
            this.subscribing = false;
            this.polling = true;
        }
        return this.pollableSubscriber.poll(duration);
    }

    @Override // io.confluent.ksql.api.client.StreamedQueryResult
    public boolean isComplete() {
        return super.isComplete();
    }

    @Override // io.confluent.ksql.api.client.StreamedQueryResult
    public boolean isFailed() {
        return super.isFailed();
    }

    public void handleError(Exception exc) {
        sendError(exc);
    }

    private void handleErrorWhilePolling(Throwable th) {
        log.error("Unexpected error while polling: " + th);
    }

    public boolean hasContinuationToken() {
        return !Objects.equals(this.continuationToken.get(), "");
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP"})
    public AtomicReference<String> getContinuationToken() {
        return this.continuationToken;
    }

    @Override // io.confluent.ksql.api.client.StreamedQueryResult
    public CompletableFuture<StreamedQueryResult> continueFromLastContinuationToken() {
        if (hasContinuationToken()) {
            return this.client.streamQuery(this.sql, this.properties);
        }
        throw new KsqlClientException("Can only continue queries that have saved a continuation token.");
    }

    public static Row pollWithCallback(StreamedQueryResult streamedQueryResult, Runnable runnable) {
        if (streamedQueryResult instanceof StreamedQueryResultImpl) {
            return ((StreamedQueryResultImpl) streamedQueryResult).poll(Duration.ZERO, runnable);
        }
        throw new IllegalArgumentException("Can only poll with callback on StreamedQueryResultImpl");
    }
}
