package org.apache.flink.streaming.connectors.akka.utils;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/akka/utils/FeederActor.class */
public class FeederActor extends UntypedActor {
    private static final Logger LOG = LoggerFactory.getLogger(FeederActor.class);
    private final MessageTypes messageType;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/akka/utils/FeederActor$MessageTypes.class */
    public enum MessageTypes {
        SINGLE_DATA,
        ITERABLE_DATA,
        BYTES_DATA,
        SINGLE_DATA_WITH_TIMESTAMP
    }

    public FeederActor(MessageTypes messageTypes) {
        this.messageType = messageTypes;
    }

    public void onReceive(Object obj) {
        Object createTimestampMessage;
        if (!(obj instanceof SubscribeReceiver)) {
            if (obj instanceof String) {
                Message.ACK_MESSAGE = obj.toString();
                return;
            } else {
                if (obj instanceof UnsubscribeReceiver) {
                    LOG.info("Stop actor!");
                    return;
                }
                return;
            }
        }
        ActorRef receiverActor = ((SubscribeReceiver) obj).getReceiverActor();
        switch (this.messageType) {
            case SINGLE_DATA:
                createTimestampMessage = createSingleDataMessage();
                break;
            case ITERABLE_DATA:
                createTimestampMessage = createIterableOfMessages();
                break;
            case BYTES_DATA:
                createTimestampMessage = createByteMessages();
                break;
            case SINGLE_DATA_WITH_TIMESTAMP:
                createTimestampMessage = createTimestampMessage();
                break;
            default:
                throw new RuntimeException("Message format specified is incorrect");
        }
        receiverActor.tell(createTimestampMessage, getSelf());
    }

    private Object createSingleDataMessage() {
        return Message.WELCOME_MESSAGE;
    }

    private List<Object> createIterableOfMessages() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Message.WELCOME_MESSAGE);
        arrayList.add(Message.FEEDER_MESSAGE);
        return arrayList;
    }

    private byte[] createByteMessages() {
        return Message.WELCOME_MESSAGE.getBytes();
    }

    private Tuple2<Object, Long> createTimestampMessage() {
        Tuple2<Object, Long> tuple2 = new Tuple2<>();
        tuple2.f0 = Message.WELCOME_MESSAGE;
        tuple2.f1 = Long.valueOf(System.currentTimeMillis());
        return tuple2;
    }
}
