package org.apache.flink.streaming.connectors.flume;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.streaming.api.function.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.source.AvroSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.Status;

/* loaded from: input_file:org/apache/flink/streaming/connectors/flume/FlumeSource.class */
public abstract class FlumeSource<OUT> extends RichSourceFunction<OUT> {
    private static final long serialVersionUID = 1;
    String host;
    String port;
    FlumeSource<OUT>.MyAvroSource avroSource;
    private volatile boolean closeWithoutSend = false;
    private boolean sendAndClose = false;
    private volatile boolean sendDone = false;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/flume/FlumeSource$MyAvroSource.class */
    public class MyAvroSource extends AvroSource {
        Collector<OUT> collector;

        public MyAvroSource() {
        }

        public Status append(AvroFlumeEvent avroFlumeEvent) {
            collect(avroFlumeEvent);
            return Status.OK;
        }

        public Status appendBatch(List<AvroFlumeEvent> list) {
            Iterator<AvroFlumeEvent> it = list.iterator();
            while (it.hasNext()) {
                collect(it.next());
            }
            return Status.OK;
        }

        private void collect(AvroFlumeEvent avroFlumeEvent) {
            Object deserialize = FlumeSource.this.deserialize(avroFlumeEvent.getBody().array());
            if (!FlumeSource.this.closeWithoutSend) {
                this.collector.collect(deserialize);
            }
            if (FlumeSource.this.sendAndClose) {
                FlumeSource.this.sendDone = true;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlumeSource(String str, int i) {
        this.host = str;
        this.port = Integer.toString(i);
    }

    public abstract OUT deserialize(byte[] bArr);

    public void configureAvroSource(Collector<OUT> collector) {
        this.avroSource = new MyAvroSource();
        this.avroSource.collector = collector;
        Context context = new Context();
        context.put("port", this.port);
        context.put("bind", this.host);
        this.avroSource.configure(context);
        this.avroSource.setChannelProcessor(new ChannelProcessor((ChannelSelector) null));
    }

    public void invoke(Collector<OUT> collector) throws Exception {
        configureAvroSource(collector);
        this.avroSource.start();
        while (!this.closeWithoutSend && !this.sendDone) {
        }
        this.avroSource.stop();
    }

    public void sendAndClose() {
        this.sendAndClose = true;
    }

    public void closeWithoutSend() {
        this.closeWithoutSend = true;
    }
}
