package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.AcksPublisher;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.api.client.exception.KsqlClientException;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.parsetools.RecordParser;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.class */
public class StreamInsertsResponseHandler extends ResponseHandler<CompletableFuture<AcksPublisher>> {
    private static final Logger log = LoggerFactory.getLogger(StreamInsertsResponseHandler.class);
    private final AcksPublisherImpl acksPublisher;
    private boolean paused;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamInsertsResponseHandler(Context context, RecordParser recordParser, CompletableFuture<AcksPublisher> completableFuture, HttpClientRequest httpClientRequest, Publisher<KsqlObject> publisher) {
        super(context, recordParser, completableFuture);
        Objects.requireNonNull(httpClientRequest);
        publisher.subscribe(new StreamInsertsSubscriber(context, httpClientRequest));
        this.acksPublisher = new AcksPublisherImpl(context);
        completableFuture.complete(this.acksPublisher);
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    protected void doHandleBodyBuffer(Buffer buffer) {
        JsonObject jsonObject = new JsonObject(buffer);
        long longValue = jsonObject.getLong("seq").longValue();
        String string = jsonObject.getString("status");
        if (!"ok".equals(string)) {
            if (!"error".equals(string)) {
                throw new IllegalStateException("Unrecognized status response from /inserts-stream: " + string);
            }
            this.acksPublisher.handleError(new KsqlClientException(String.format("Received error from /inserts-stream. Inserts sequence number: %d. Error code: %d. Message: %s", Long.valueOf(longValue), jsonObject.getInteger("error_code"), jsonObject.getString("message"))));
        } else {
            if (!this.acksPublisher.accept(new InsertAckImpl(longValue)) || this.paused) {
                return;
            }
            this.recordParser.pause();
            this.acksPublisher.drainHandler(this::publisherReceptive);
            this.paused = true;
        }
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    protected void doHandleException(Throwable th) {
        log.error(th);
        this.acksPublisher.handleError(new Exception(th));
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    protected void doHandleBodyEnd() {
        this.acksPublisher.complete();
    }

    private void publisherReceptive() {
        checkContext();
        this.paused = false;
        this.recordParser.resume();
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    public /* bridge */ /* synthetic */ void handleBodyEnd(Void r4) {
        super.handleBodyEnd(r4);
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    public /* bridge */ /* synthetic */ void handleException(Throwable th) {
        super.handleException(th);
    }

    @Override // io.confluent.ksql.api.client.impl.ResponseHandler
    public /* bridge */ /* synthetic */ void handleBodyBuffer(Buffer buffer) {
        super.handleBodyBuffer(buffer);
    }
}
