package co.cask.cdap.messaging.client;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.TopicAlreadyExistsException;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.discovery.EndpointStrategy;
import co.cask.cdap.common.discovery.RandomEndpointStrategy;
import co.cask.cdap.common.http.DefaultHttpRequestConfig;
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.Schemas;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequestConfig;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.inject.Inject;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.tephra.TransactionCodec;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;

/* loaded from: input_file:co/cask/cdap/messaging/client/ClientMessagingService.class */
public final class ClientMessagingService implements MessagingService {
    private static final long DISCOVERY_PICK_TIMEOUT_SECS = 5;
    private static final HttpRequestConfig HTTP_REQUEST_CONFIG = new DefaultHttpRequestConfig();
    private static final TransactionCodec TRANSACTION_CODEC = new TransactionCodec();
    private static final Gson GSON = new Gson();
    private static final Type TOPIC_PROPERTY_TYPE = new TypeToken<Map<String, String>>() { // from class: co.cask.cdap.messaging.client.ClientMessagingService.1
    }.getType();
    private static final Type TOPIC_LIST_TYPE = new TypeToken<List<String>>() { // from class: co.cask.cdap.messaging.client.ClientMessagingService.2
    }.getType();
    private final Supplier<EndpointStrategy> endpointStrategy;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/client/ClientMessagingService$ClientMessageFetcher.class */
    public final class ClientMessageFetcher extends MessageFetcher {
        private final TopicId topicId;
        private final DatumReader<GenericRecord> messageReader;
        private GenericRecord messageRecord;

        private ClientMessageFetcher(TopicId topicId) {
            this.topicId = topicId;
            this.messageRecord = new GenericData.Record(Schemas.V1.ConsumeResponse.SCHEMA.getElementType());
            this.messageReader = new GenericDatumReader(Schemas.V1.ConsumeResponse.SCHEMA.getElementType());
        }

        @Override // co.cask.cdap.messaging.MessageFetcher
        public CloseableIterator<RawMessage> fetch() throws IOException, TopicNotFoundException {
            GenericData.Record record = new GenericData.Record(Schemas.V1.ConsumeRequest.SCHEMA);
            if (getStartOffset() != null) {
                record.put("startFrom", ByteBuffer.wrap(getStartOffset()));
            }
            if (getStartTime() != null) {
                record.put("startFrom", getStartTime());
            }
            record.put("inclusive", Boolean.valueOf(isIncludeStart()));
            record.put("limit", Integer.valueOf(getLimit()));
            if (getTransaction() != null) {
                record.put("transaction", ByteBuffer.wrap(ClientMessagingService.TRANSACTION_CODEC.encode(getTransaction())));
            }
            final HttpURLConnection httpURLConnection = (HttpURLConnection) ClientMessagingService.this.createURL(ClientMessagingService.this.createTopicPath(this.topicId) + "/poll").openConnection();
            httpURLConnection.setConnectTimeout(ClientMessagingService.HTTP_REQUEST_CONFIG.getConnectTimeout());
            httpURLConnection.setReadTimeout(ClientMessagingService.HTTP_REQUEST_CONFIG.getReadTimeout());
            httpURLConnection.setRequestMethod("POST");
            httpURLConnection.setRequestProperty("Content-Type", "avro/binary");
            httpURLConnection.setDoInput(true);
            httpURLConnection.setDoOutput(true);
            new GenericDatumWriter(Schemas.V1.ConsumeRequest.SCHEMA).write(record, EncoderFactory.get().directBinaryEncoder(httpURLConnection.getOutputStream(), (BinaryEncoder) null));
            int responseCode = httpURLConnection.getResponseCode();
            if (responseCode == 404) {
                throw new TopicNotFoundException(this.topicId.getNamespace(), this.topicId.getTopic());
            }
            ClientMessagingService.this.handleError(responseCode, new Supplier<String>() { // from class: co.cask.cdap.messaging.client.ClientMessagingService.ClientMessageFetcher.1
                /* renamed from: get, reason: merged with bridge method [inline-methods] */
                public String m9get() {
                    InputStream errorStream = httpURLConnection.getErrorStream();
                    if (errorStream == null) {
                        return "";
                    }
                    try {
                        return new String(ByteStreams.toByteArray(errorStream), StandardCharsets.UTF_8);
                    } catch (IOException e) {
                        return "";
                    }
                }
            }, "Failed to update topic " + this.topicId);
            ClientMessagingService.this.verifyContentType(httpURLConnection.getHeaderFields(), "avro/binary");
            final BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(httpURLConnection.getInputStream(), (BinaryDecoder) null);
            final long readArrayStart = binaryDecoder.readArrayStart();
            return new AbstractCloseableIterator<RawMessage>() { // from class: co.cask.cdap.messaging.client.ClientMessagingService.ClientMessageFetcher.2
                private long itemCount;

                {
                    this.itemCount = readArrayStart;
                }

                /* JADX INFO: Access modifiers changed from: protected */
                /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
                public RawMessage m10computeNext() {
                    if (readArrayStart == 0) {
                        return (RawMessage) endOfData();
                    }
                    try {
                        if (this.itemCount == 0) {
                            this.itemCount = binaryDecoder.arrayNext();
                            if (this.itemCount == 0) {
                                return (RawMessage) endOfData();
                            }
                        }
                        this.itemCount--;
                        ClientMessageFetcher.this.messageRecord = (GenericRecord) ClientMessageFetcher.this.messageReader.read(ClientMessageFetcher.this.messageRecord, binaryDecoder);
                        return new RawMessage(Bytes.toBytes((ByteBuffer) ClientMessageFetcher.this.messageRecord.get("id")), Bytes.toBytes((ByteBuffer) ClientMessageFetcher.this.messageRecord.get("payload")));
                    } catch (IOException e) {
                        throw Throwables.propagate(e);
                    }
                }

                public void close() {
                    httpURLConnection.disconnect();
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/client/ClientMessagingService$ExposedByteArrayOutputStream.class */
    public static final class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
        private ExposedByteArrayOutputStream() {
        }

        ByteBuffer toByteBuffer() {
            return ByteBuffer.wrap(this.buf, 0, this.count);
        }
    }

    @VisibleForTesting
    @Inject
    public ClientMessagingService(final DiscoveryServiceClient discoveryServiceClient) {
        this.endpointStrategy = Suppliers.memoize(new Supplier<EndpointStrategy>() { // from class: co.cask.cdap.messaging.client.ClientMessagingService.3
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public EndpointStrategy m7get() {
                return new RandomEndpointStrategy(discoveryServiceClient.discover("messaging.service"));
            }
        });
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException {
        TopicId topicId = topicMetadata.getTopicId();
        HttpResponse execute = HttpRequests.execute(HttpRequest.put(createURL(createTopicPath(topicId))).withBody(GSON.toJson(topicMetadata.getProperties())).build(), HTTP_REQUEST_CONFIG);
        if (execute.getResponseCode() == 409) {
            throw new TopicAlreadyExistsException(topicId.getNamespace(), topicId.getTopic());
        }
        handleError(execute, "Failed to create topic " + topicId);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException {
        TopicId topicId = topicMetadata.getTopicId();
        HttpResponse execute = HttpRequests.execute(HttpRequest.put(createURL(createTopicPath(topicId) + "/properties")).withBody(GSON.toJson(topicMetadata.getProperties())).build(), HTTP_REQUEST_CONFIG);
        if (execute.getResponseCode() == 404) {
            throw new TopicNotFoundException(topicId.getNamespace(), topicId.getTopic());
        }
        handleError(execute, "Failed to update topic " + topicId);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException {
        HttpResponse execute = HttpRequests.execute(HttpRequest.delete(createURL(createTopicPath(topicId))).build(), HTTP_REQUEST_CONFIG);
        if (execute.getResponseCode() == 404) {
            throw new TopicNotFoundException(topicId.getNamespace(), topicId.getTopic());
        }
        handleError(execute, "Failed to update topic " + topicId);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException {
        HttpResponse execute = HttpRequests.execute(HttpRequest.get(createURL(createTopicPath(topicId))).build(), HTTP_REQUEST_CONFIG);
        if (execute.getResponseCode() == 404) {
            throw new TopicNotFoundException(topicId.getNamespace(), topicId.getTopic());
        }
        handleError(execute, "Failed to update topic " + topicId);
        return new TopicMetadata(topicId, (Map<String, String>) GSON.fromJson(execute.getResponseBodyAsString(), TOPIC_PROPERTY_TYPE));
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public List<TopicId> listTopics(NamespaceId namespaceId) throws IOException {
        HttpResponse execute = HttpRequests.execute(HttpRequest.get(createURL(namespaceId.getNamespace() + "/topics")).build(), HTTP_REQUEST_CONFIG);
        handleError(execute, "Failed to list topics in namespace " + namespaceId);
        List list = (List) GSON.fromJson(execute.getResponseBodyAsString(), TOPIC_LIST_TYPE);
        ArrayList arrayList = new ArrayList(list.size());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(namespaceId.topic((String) it.next()));
        }
        return Collections.unmodifiableList(arrayList);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException {
        return new ClientMessageFetcher(topicId);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    @Nullable
    public RollbackDetail publish(StoreRequest storeRequest) throws TopicNotFoundException, IOException {
        HttpResponse performWriteRequest = performWriteRequest(storeRequest, true);
        byte[] responseBody = performWriteRequest.getResponseBody();
        if (responseBody.length == 0) {
            return null;
        }
        verifyContentType(performWriteRequest.getHeaders().asMap(), "avro/binary");
        return new ClientRollbackDetail(responseBody);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void storePayload(StoreRequest storeRequest) throws TopicNotFoundException, IOException {
        performWriteRequest(storeRequest, false);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException {
        HttpResponse execute = HttpRequests.execute(HttpRequest.post(createURL(createTopicPath(topicId) + "/rollback")).addHeader("Content-Type", "avro/binary").withBody(rollbackDetail instanceof ClientRollbackDetail ? ByteBuffer.wrap(((ClientRollbackDetail) rollbackDetail).getEncoded()) : encodeRollbackDetail(rollbackDetail)).build(), HTTP_REQUEST_CONFIG);
        if (execute.getResponseCode() == 404) {
            throw new TopicNotFoundException(topicId.getNamespace(), topicId.getTopic());
        }
        handleError(execute, "Failed to rollback message in topic " + topicId + " with rollback detail " + rollbackDetail);
    }

    private HttpResponse performWriteRequest(StoreRequest storeRequest, boolean z) throws IOException, TopicNotFoundException {
        GenericData.Record record = new GenericData.Record(Schemas.V1.PublishRequest.SCHEMA);
        if (storeRequest.isTransactional()) {
            record.put("transactionWritePointer", Long.valueOf(storeRequest.getTransactionWritePointer()));
        }
        record.put("messages", convertPayloads(storeRequest));
        ExposedByteArrayOutputStream exposedByteArrayOutputStream = new ExposedByteArrayOutputStream();
        new GenericDatumWriter(Schemas.V1.PublishRequest.SCHEMA).write(record, EncoderFactory.get().directBinaryEncoder(exposedByteArrayOutputStream, (BinaryEncoder) null));
        String str = z ? "publish" : "store";
        TopicId topicId = storeRequest.getTopicId();
        HttpResponse execute = HttpRequests.execute(HttpRequest.post(createURL(createTopicPath(topicId) + "/" + str)).addHeader("Content-Type", "avro/binary").withBody(exposedByteArrayOutputStream.toByteBuffer()).build(), HTTP_REQUEST_CONFIG);
        if (execute.getResponseCode() == 404) {
            throw new TopicNotFoundException(topicId.getNamespace(), topicId.getTopic());
        }
        handleError(execute, "Failed to " + str + " message to topic " + topicId);
        return execute;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String createTopicPath(TopicId topicId) {
        return topicId.getNamespace() + "/topics/" + topicId.getTopic();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public URL createURL(String str) throws MalformedURLException {
        Discoverable pick = ((EndpointStrategy) this.endpointStrategy.get()).pick(DISCOVERY_PICK_TIMEOUT_SECS, TimeUnit.SECONDS);
        if (pick == null) {
            throw new ServiceUnavailableException("No endpoint available for messaging service");
        }
        InetSocketAddress socketAddress = pick.getSocketAddress();
        return URI.create(String.format("http://%s:%d/v1/namespaces/", socketAddress.getHostName(), Integer.valueOf(socketAddress.getPort()))).resolve(str).toURL();
    }

    private void handleError(final HttpResponse httpResponse, String str) throws IOException {
        handleError(httpResponse.getResponseCode(), new Supplier<String>() { // from class: co.cask.cdap.messaging.client.ClientMessagingService.4
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public String m8get() {
                return httpResponse.getResponseBodyAsString();
            }
        }, str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(int i, Supplier<String> supplier, String str) throws IOException {
        switch (i) {
            case 200:
                return;
            case 400:
                throw new IllegalArgumentException(str + ". Reason: " + ((String) supplier.get()));
            case 503:
                throw new ServiceUnavailableException("messaging.service");
            default:
                throw new IOException(str + ". Reason: " + ((String) supplier.get()));
        }
    }

    private List<ByteBuffer> convertPayloads(StoreRequest storeRequest) {
        ArrayList arrayList = new ArrayList();
        while (storeRequest.hasNext()) {
            arrayList.add(ByteBuffer.wrap((byte[]) storeRequest.next()));
        }
        return arrayList;
    }

    private ByteBuffer encodeRollbackDetail(RollbackDetail rollbackDetail) throws IOException {
        Schema schema = Schemas.V1.PublishResponse.SCHEMA;
        GenericData.Record record = new GenericData.Record(schema);
        record.put("transactionWritePointer", Long.valueOf(rollbackDetail.getTransactionWritePointer()));
        GenericData.Record record2 = new GenericData.Record(schema.getField("rollbackRange").schema());
        record2.put("startTimestamp", Long.valueOf(rollbackDetail.getStartTimestamp()));
        record2.put("startSequenceId", Integer.valueOf(rollbackDetail.getStartSequenceId()));
        record2.put("endTimestamp", Long.valueOf(rollbackDetail.getEndTimestamp()));
        record2.put("endSequenceId", Integer.valueOf(rollbackDetail.getEndSequenceId()));
        record.put("rollbackRange", record2);
        ExposedByteArrayOutputStream exposedByteArrayOutputStream = new ExposedByteArrayOutputStream();
        new GenericDatumWriter(Schemas.V1.PublishRequest.SCHEMA).write(record, EncoderFactory.get().directBinaryEncoder(exposedByteArrayOutputStream, (BinaryEncoder) null));
        return exposedByteArrayOutputStream.toByteBuffer();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void verifyContentType(Map<String, ? extends Collection<String>> map, String str) {
        Collection<String> collection = map.get("Content-Type");
        if (collection == null || !str.equals(Iterables.getFirst(collection, (Object) null))) {
            throw new IllegalArgumentException("Only " + str + " content type is support.");
        }
    }
}
