package com.azure.data.schemaregistry.avro;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.models.SerializationType;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/data/schemaregistry/avro/SchemaRegistryAvroSerializer.class */
public final class SchemaRegistryAvroSerializer implements ObjectSerializer {
    private final ClientLogger logger = new ClientLogger((Class<?>) SchemaRegistryAvroSerializer.class);
    static final int SCHEMA_ID_SIZE = 32;
    static final int RECORD_FORMAT_INDICATOR_SIZE = 4;
    private final SchemaRegistryAsyncClient schemaRegistryClient;
    private final AvroSchemaRegistryUtils avroSchemaRegistryUtils;
    private final String schemaGroup;
    private final Boolean autoRegisterSchemas;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SchemaRegistryAvroSerializer(SchemaRegistryAsyncClient schemaRegistryAsyncClient, AvroSchemaRegistryUtils avroSchemaRegistryUtils, String str, Boolean bool) {
        this.schemaRegistryClient = (SchemaRegistryAsyncClient) Objects.requireNonNull(schemaRegistryAsyncClient, "'schemaRegistryClient' cannot be null.");
        this.avroSchemaRegistryUtils = (AvroSchemaRegistryUtils) Objects.requireNonNull(avroSchemaRegistryUtils, "'avroSchemaRegistryUtils' cannot be null.");
        this.schemaGroup = str;
        this.autoRegisterSchemas = bool;
    }

    @Override // com.azure.core.util.serializer.ObjectSerializer
    public <T> T deserialize(InputStream inputStream, TypeReference<T> typeReference) {
        return deserializeAsync(inputStream, typeReference).block();
    }

    @Override // com.azure.core.util.serializer.ObjectSerializer
    public <T> Mono<T> deserializeAsync(InputStream inputStream, TypeReference<T> typeReference) {
        return inputStream == null ? Mono.empty() : Mono.fromCallable(() -> {
            byte[] bArr = new byte[inputStream.available()];
            do {
            } while (inputStream.read(bArr) != -1);
            return bArr;
        }).flatMap(bArr -> {
            if (bArr == null || bArr.length == 0) {
                return Mono.empty();
            }
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            return !Arrays.equals(getRecordFormatIndicator(wrap), new byte[]{0, 0, 0, 0}) ? Mono.error(new IllegalStateException("Illegal format: unsupported record format indicator in payload")) : this.schemaRegistryClient.getSchema(getSchemaIdFromPayload(wrap)).handle((schemaProperties, synchronousSink) -> {
                byte[] schema = schemaProperties.getSchema();
                if (schema == null) {
                    synchronousSink.error(this.logger.logExceptionAsError(new NullPointerException(String.format("Payload schema returned as null. Schema type: %s, Schema ID: %s", schemaProperties.getSerializationType(), schemaProperties.getSchemaId()))));
                } else {
                    int position = wrap.position() + wrap.arrayOffset();
                    synchronousSink.next(this.avroSchemaRegistryUtils.decode(Arrays.copyOfRange(wrap.array(), position, position + (wrap.limit() - 32)), schema));
                }
            });
        });
    }

    @Override // com.azure.core.util.serializer.ObjectSerializer
    public void serialize(OutputStream outputStream, Object obj) {
        serializeAsync(outputStream, obj).block();
    }

    @Override // com.azure.core.util.serializer.ObjectSerializer
    public Mono<Void> serializeAsync(OutputStream outputStream, Object obj) {
        if (obj == null) {
            return FluxUtil.monoError(this.logger, new NullPointerException("Null object, behavior should be defined in concrete serializer implementation."));
        }
        String schemaString = this.avroSchemaRegistryUtils.getSchemaString(obj);
        return maybeRegisterSchema(this.schemaGroup, this.avroSchemaRegistryUtils.getSchemaName(obj), schemaString).handle((str, synchronousSink) -> {
            ByteBuffer put = ByteBuffer.allocate(4).put(new byte[]{0, 0, 0, 0});
            ByteBuffer put2 = ByteBuffer.allocate(32).put(str.getBytes(StandardCharsets.UTF_8));
            try {
                outputStream.write(put.array());
                outputStream.write(put2.array());
                outputStream.write(this.avroSchemaRegistryUtils.encode(obj));
                synchronousSink.complete();
            } catch (IOException e) {
                synchronousSink.error(new UncheckedIOException(e.getMessage(), e));
            }
        });
    }

    private String getSchemaIdFromPayload(ByteBuffer byteBuffer) {
        byte[] bArr = new byte[32];
        byteBuffer.get(bArr);
        return new String(bArr, StandardCharsets.UTF_8);
    }

    private byte[] getRecordFormatIndicator(ByteBuffer byteBuffer) {
        byte[] bArr = new byte[4];
        byteBuffer.get(bArr);
        return bArr;
    }

    private Mono<String> maybeRegisterSchema(String str, String str2, String str3) {
        return this.autoRegisterSchemas.booleanValue() ? this.schemaRegistryClient.registerSchema(str, str2, str3, SerializationType.AVRO).map((v0) -> {
            return v0.getSchemaId();
        }) : this.schemaRegistryClient.getSchemaId(str, str2, str3, SerializationType.AVRO);
    }
}
