package org.apache.flink.statefun.flink.core.message;

import java.io.IOException;
import java.util.Objects;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.statefun.flink.common.protobuf.ProtobufSerializer;
import org.apache.flink.statefun.flink.core.generated.Checkpoint;
import org.apache.flink.statefun.flink.core.generated.Envelope;
import org.apache.flink.statefun.flink.core.generated.Payload;
import org.apache.flink.statefun.sdk.Address;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/message/MessageFactory.class */
public final class MessageFactory {
    private final ProtobufSerializer<Envelope> envelopeSerializer = ProtobufSerializer.forMessageGeneratedClass(Envelope.class);
    private final MessagePayloadSerializer userMessagePayloadSerializer;

    public static MessageFactory forKey(MessageFactoryKey messageFactoryKey) {
        return new MessageFactory(forPayloadKey(messageFactoryKey));
    }

    private MessageFactory(MessagePayloadSerializer messagePayloadSerializer) {
        this.userMessagePayloadSerializer = (MessagePayloadSerializer) Objects.requireNonNull(messagePayloadSerializer);
    }

    public Message from(long j) {
        return from(envelopeWithCheckpointId(j));
    }

    public Message from(DataInputView dataInputView) throws IOException {
        return from(deserializeEnvelope(dataInputView));
    }

    public Message from(Address address, Address address2, Object obj) {
        return new SdkMessage(address, address2, obj);
    }

    public Message from(Address address, Address address2, Object obj, String str) {
        return new SdkMessage(address, address2, obj, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
        copyEnvelope(dataInputView, dataOutputView);
    }

    private Message from(Envelope envelope) {
        return new ProtobufMessage(envelope);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Payload serializeUserMessagePayload(Object obj) {
        return this.userMessagePayloadSerializer.serialize(obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Object deserializeUserMessagePayload(ClassLoader classLoader, Payload payload) {
        return this.userMessagePayloadSerializer.deserialize(classLoader, payload);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Object copyUserMessagePayload(ClassLoader classLoader, Object obj) {
        return this.userMessagePayloadSerializer.copy(classLoader, obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void serializeEnvelope(Envelope envelope, DataOutputView dataOutputView) throws IOException {
        this.envelopeSerializer.serialize(envelope, dataOutputView);
    }

    private Envelope deserializeEnvelope(DataInputView dataInputView) throws IOException {
        return this.envelopeSerializer.deserialize(dataInputView);
    }

    private void copyEnvelope(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
        this.envelopeSerializer.copy(dataInputView, dataOutputView);
    }

    private static Envelope envelopeWithCheckpointId(long j) {
        return Envelope.newBuilder().setCheckpoint(Checkpoint.newBuilder().setCheckpointId(j).m45build()).m94build();
    }

    private static MessagePayloadSerializer forPayloadKey(MessageFactoryKey messageFactoryKey) {
        switch (messageFactoryKey.getType()) {
            case WITH_KRYO_PAYLOADS:
                return new MessagePayloadSerializerKryo();
            case WITH_PROTOBUF_PAYLOADS:
                return new MessagePayloadSerializerPb();
            case WITH_RAW_PAYLOADS:
                return new MessagePayloadSerializerRaw();
            case WITH_CUSTOM_PAYLOADS:
                return forCustomPayloadSerializer(messageFactoryKey.getCustomPayloadSerializerClassName().orElseThrow(() -> {
                    return new UnsupportedOperationException("WITH_CUSTOM_PAYLOADS requires custom payload serializer class name to be specified in MessageFactoryKey");
                }));
            default:
                throw new IllegalArgumentException("unknown serialization method " + messageFactoryKey.getType());
        }
    }

    private static MessagePayloadSerializer forCustomPayloadSerializer(String str) {
        try {
            return (MessagePayloadSerializer) Class.forName(str, true, Thread.currentThread().getContextClassLoader()).getConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Throwable th) {
            throw new UnsupportedOperationException(String.format("Failed to create custom payload serializer: %s", str), th);
        }
    }
}
