package net.neoremind.fountain.producer.dispatch.fountainmq;

import java.util.Iterator;
import net.neoremind.fountain.changedata.BinlogTraceable;
import net.neoremind.fountain.common.mq.FountainMQ;
import net.neoremind.fountain.producer.dispatch.DispatchException;
import net.neoremind.fountain.producer.dispatch.Transport;
import net.neoremind.fountain.producer.dispatch.misc.MessageSeparationPolicy;
import net.neoremind.fountain.producer.dispatch.misc.NoSeparationPolicy;

/* loaded from: input_file:net/neoremind/fountain/producer/dispatch/fountainmq/FoutainMQTransport.class */
public class FoutainMQTransport implements Transport {
    private FountainMQ fmq;
    private MessageSeparationPolicy messageSeparationPolicy = new NoSeparationPolicy();

    public FountainMQ getFmq() {
        return this.fmq;
    }

    public void setFmq(FountainMQ fountainMQ) {
        this.fmq = fountainMQ;
    }

    @Override // net.neoremind.fountain.producer.dispatch.Transport
    public void transport(Object obj) throws DispatchException {
        sendWithSeparation(obj, true);
    }

    @Override // net.neoremind.fountain.producer.dispatch.Transport
    public void transport(Object obj, boolean z) throws DispatchException {
        sendWithSeparation(obj, z);
    }

    @Override // net.neoremind.fountain.producer.dispatch.Transport
    public void registerProducer(String str) {
    }

    private void sendWithSeparation(Object obj, boolean z) {
        Iterator<Object> separate = this.messageSeparationPolicy.separate(obj);
        while (separate.hasNext()) {
            pushEvent(separate.next(), z);
        }
    }

    private BinlogTraceable checkIfBinlogTraceable(Object obj) {
        if (obj instanceof BinlogTraceable) {
            return (BinlogTraceable) obj;
        }
        throw new DispatchException("event must be BinlogTraceable");
    }

    private void pushEvent(Object obj, boolean z) {
        BinlogTraceable checkIfBinlogTraceable = checkIfBinlogTraceable(obj);
        if (this.fmq.isExceedMaxCapacity(checkIfBinlogTraceable.getDataSize())) {
            throw new DispatchException("transaction is exceed max size:" + checkIfBinlogTraceable.getDataSize());
        }
        if (z) {
            this.fmq.push(checkIfBinlogTraceable);
        } else {
            this.fmq.push(checkIfBinlogTraceable, 0L);
        }
    }

    public MessageSeparationPolicy getMessageSeparationPolicy() {
        return this.messageSeparationPolicy;
    }

    public void setMessageSeparationPolicy(MessageSeparationPolicy messageSeparationPolicy) {
        this.messageSeparationPolicy = messageSeparationPolicy;
    }
}
