package software.amazon.awssdk.awscore.eventstream;

import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.http.HttpResponseHandler;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.eventstream.HeaderValue;
import software.amazon.eventstream.Message;
import software.amazon.eventstream.MessageDecoder;

@SdkProtectedApi
/* loaded from: input_file:software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer.class */
public final class EventStreamAsyncResponseTransformer<ResponseT, EventT> implements AsyncResponseTransformer<SdkResponse, Void> {
    private static final Logger log = Logger.loggerFor((Class<?>) EventStreamAsyncResponseTransformer.class);
    private final EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler;
    private final HttpResponseHandler<? extends ResponseT> initialResponseHandler;
    private final HttpResponseHandler<? extends EventT> eventResponseHandler;
    private final HttpResponseHandler<? extends Throwable> exceptionResponseHandler;
    private final Supplier<ExecutionAttributes> attributesFactory;
    private final CompletableFuture<Void> future;
    private final AtomicBoolean exceptionsMayBeSent;
    private volatile CompletableFuture<Void> transformFuture;
    private volatile String requestId;
    private volatile String extendedRequestId;

    /* loaded from: input_file:software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer$Builder.class */
    public static final class Builder<ResponseT, EventT> {
        private EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler;
        private HttpResponseHandler<? extends ResponseT> initialResponseHandler;
        private HttpResponseHandler<? extends EventT> eventResponseHandler;
        private HttpResponseHandler<? extends Throwable> exceptionResponseHandler;
        private CompletableFuture<Void> future;
        private String serviceName;

        private Builder() {
        }

        public Builder<ResponseT, EventT> eventStreamResponseHandler(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler) {
            this.eventStreamResponseHandler = eventStreamResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> initialResponseHandler(HttpResponseHandler<? extends ResponseT> httpResponseHandler) {
            this.initialResponseHandler = httpResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> eventResponseHandler(HttpResponseHandler<? extends EventT> httpResponseHandler) {
            this.eventResponseHandler = httpResponseHandler;
            return this;
        }

        public Builder<ResponseT, EventT> exceptionResponseHandler(HttpResponseHandler<? extends Throwable> httpResponseHandler) {
            this.exceptionResponseHandler = httpResponseHandler;
            return this;
        }

        @Deprecated
        public Builder<ResponseT, EventT> executor(Executor executor) {
            return this;
        }

        public Builder<ResponseT, EventT> future(CompletableFuture<Void> completableFuture) {
            this.future = completableFuture;
            return this;
        }

        public Builder<ResponseT, EventT> serviceName(String str) {
            this.serviceName = str;
            return this;
        }

        public EventStreamAsyncResponseTransformer<ResponseT, EventT> build() {
            return new EventStreamAsyncResponseTransformer<>(this.eventStreamResponseHandler, this.initialResponseHandler, this.eventResponseHandler, this.exceptionResponseHandler, this.future, this.serviceName);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer$SynchronousMessageDecoder.class */
    public static final class SynchronousMessageDecoder {
        private final MessageDecoder decoder;

        private SynchronousMessageDecoder() {
            this.decoder = new MessageDecoder();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Iterable<Message> decode(ByteBuffer byteBuffer) {
            this.decoder.feed(byteBuffer);
            return this.decoder.getDecodedMessages();
        }
    }

    private EventStreamAsyncResponseTransformer(EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler, HttpResponseHandler<? extends ResponseT> httpResponseHandler, HttpResponseHandler<? extends EventT> httpResponseHandler2, HttpResponseHandler<? extends Throwable> httpResponseHandler3, CompletableFuture<Void> completableFuture, String str) {
        this.exceptionsMayBeSent = new AtomicBoolean(true);
        this.requestId = null;
        this.extendedRequestId = null;
        this.eventStreamResponseHandler = eventStreamResponseHandler;
        this.initialResponseHandler = httpResponseHandler;
        this.eventResponseHandler = httpResponseHandler2;
        this.exceptionResponseHandler = httpResponseHandler3;
        this.future = completableFuture;
        this.attributesFactory = () -> {
            return new ExecutionAttributes().putAttribute(SdkExecutionAttribute.SERVICE_NAME, str);
        };
    }

    public static <ResponseT, EventT> Builder<ResponseT, EventT> builder() {
        return new Builder<>();
    }

    @Override // software.amazon.awssdk.core.async.AsyncResponseTransformer
    public CompletableFuture<Void> prepare() {
        this.transformFuture = new CompletableFuture<>();
        return this.transformFuture;
    }

    @Override // software.amazon.awssdk.core.async.AsyncResponseTransformer
    public void onResponse(SdkResponse sdkResponse) {
        if (sdkResponse == null || sdkResponse.sdkHttpResponse() == null) {
            return;
        }
        this.requestId = sdkResponse.sdkHttpResponse().firstMatchingHeader(HttpResponseHandler.X_AMZN_REQUEST_ID_HEADERS).orElse(null);
        this.extendedRequestId = sdkResponse.sdkHttpResponse().firstMatchingHeader(HttpResponseHandler.X_AMZ_ID_2_HEADER).orElse(null);
        log.debug(() -> {
            return getLogPrefix() + "Received HTTP response headers: " + sdkResponse;
        });
    }

    @Override // software.amazon.awssdk.core.async.AsyncResponseTransformer
    public void onStream(SdkPublisher<ByteBuffer> sdkPublisher) {
        Validate.isTrue(this.transformFuture != null, "onStream() invoked without prepare().", new Object[0]);
        this.exceptionsMayBeSent.set(true);
        SynchronousMessageDecoder synchronousMessageDecoder = new SynchronousMessageDecoder();
        EventStreamResponseHandler<ResponseT, EventT> eventStreamResponseHandler = this.eventStreamResponseHandler;
        Objects.requireNonNull(synchronousMessageDecoder);
        eventStreamResponseHandler.onEventStream(sdkPublisher.flatMapIterable(byteBuffer -> {
            return synchronousMessageDecoder.decode(byteBuffer);
        }).flatMapIterable(this::transformMessage).doAfterOnComplete(this::handleOnStreamComplete).doAfterOnError(this::handleOnStreamError).doAfterOnCancel(this::handleOnStreamCancel));
    }

    @Override // software.amazon.awssdk.core.async.AsyncResponseTransformer
    public void exceptionOccurred(Throwable th) {
        if (this.exceptionsMayBeSent.compareAndSet(true, false)) {
            try {
                this.eventStreamResponseHandler.exceptionOccurred(th);
            } catch (RuntimeException e) {
                log.warn(() -> {
                    return "Exception raised by exceptionOccurred. Ignoring.";
                }, e);
            }
            this.transformFuture.completeExceptionally(th);
        }
    }

    private void handleOnStreamComplete() {
        log.trace(() -> {
            return getLogPrefix() + "Event stream completed successfully.";
        });
        this.exceptionsMayBeSent.set(false);
        this.eventStreamResponseHandler.complete();
        this.transformFuture.complete(null);
        this.future.complete(null);
    }

    private void handleOnStreamError(Throwable th) {
        log.trace(() -> {
            return getLogPrefix() + "Event stream failed.";
        }, th);
        exceptionOccurred(th);
    }

    private void handleOnStreamCancel() {
        log.trace(() -> {
            return getLogPrefix() + "Event stream cancelled.";
        });
        this.exceptionsMayBeSent.set(false);
        this.transformFuture.complete(null);
        this.future.complete(null);
    }

    private Iterable<EventT> transformMessage(Message message) {
        try {
            try {
                if (isEvent(message)) {
                    return transformEventMessage(message);
                }
                if (isError(message) || isException(message)) {
                    throw transformErrorMessage(message);
                }
                log.debug(() -> {
                    return getLogPrefix() + "Decoded a message of an unknown type, it will be dropped: " + message;
                });
                return Collections.emptyList();
            } catch (Error | SdkException e) {
                throw e;
            }
        } catch (Throwable th) {
            throw SdkClientException.builder().cause(th).mo986build();
        }
    }

    private Iterable<EventT> transformEventMessage(Message message) throws Exception {
        SdkHttpFullResponse adaptMessageToResponse = adaptMessageToResponse(message, false);
        if (!message.getHeaders().get(":event-type").getString().equals("initial-response")) {
            EventT handle = this.eventResponseHandler.handle(adaptMessageToResponse, this.attributesFactory.get());
            log.debug(() -> {
                return getLogPrefix() + "Decoded event: " + handle;
            });
            return Collections.singleton(handle);
        }
        ResponseT handle2 = this.initialResponseHandler.handle(adaptMessageToResponse, this.attributesFactory.get());
        this.eventStreamResponseHandler.responseReceived(handle2);
        log.debug(() -> {
            return getLogPrefix() + "Decoded initial response: " + handle2;
        });
        return Collections.emptyList();
    }

    private Throwable transformErrorMessage(Message message) throws Exception {
        Throwable handle = this.exceptionResponseHandler.handle(adaptMessageToResponse(message, true), this.attributesFactory.get());
        log.debug(() -> {
            return getLogPrefix() + "Decoded error or exception: " + handle;
        }, handle);
        return handle;
    }

    private String getLogPrefix() {
        if (this.requestId == null) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        sb.append("(");
        sb.append("RequestId: ").append(this.requestId);
        if (this.extendedRequestId != null) {
            sb.append(", ExtendedRequestId: ").append(this.extendedRequestId);
        }
        sb.append(") ");
        return sb.toString();
    }

    private SdkHttpFullResponse adaptMessageToResponse(Message message, boolean z) {
        Map<String, List<String>> map = (Map) message.getHeaders().entrySet().stream().collect(HashMap::new, (hashMap, entry) -> {
            hashMap.put((String) entry.getKey(), Collections.singletonList(((HeaderValue) entry.getValue()).getString()));
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
        if (this.requestId != null) {
            map.put(HttpResponseHandler.X_AMZN_REQUEST_ID_HEADER, Collections.singletonList(this.requestId));
        }
        if (this.extendedRequestId != null) {
            map.put(HttpResponseHandler.X_AMZ_ID_2_HEADER, Collections.singletonList(this.extendedRequestId));
        }
        SdkHttpFullResponse.Builder headers = SdkHttpFullResponse.builder().content(AbortableInputStream.create(new ByteArrayInputStream(message.getPayload()))).headers(map);
        if (!z) {
            headers.statusCode(200);
        }
        return headers.mo986build();
    }

    private boolean isEvent(Message message) {
        return "event".equals(message.getHeaders().get(":message-type").getString());
    }

    private boolean isError(Message message) {
        return "error".equals(message.getHeaders().get(":message-type").getString());
    }

    private boolean isException(Message message) {
        return "exception".equals(message.getHeaders().get(":message-type").getString());
    }
}
