package org.apache.flink.streaming.connectors.flume;

import java.util.Iterator;
import java.util.List;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.source.AvroSource;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.Status;

/* loaded from: input_file:org/apache/flink/streaming/connectors/flume/FlumeSource.class */
public class FlumeSource<OUT> extends ConnectorSource<OUT> {
    private static final long serialVersionUID = 1;
    String host;
    String port;
    volatile boolean finished;
    FlumeSource<OUT>.MyAvroSource avroSource;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/flume/FlumeSource$MyAvroSource.class */
    public class MyAvroSource extends AvroSource {
        Collector<OUT> collector;

        public MyAvroSource() {
        }

        public Status append(AvroFlumeEvent avroFlumeEvent) {
            collect(avroFlumeEvent);
            return Status.OK;
        }

        public Status appendBatch(List<AvroFlumeEvent> list) {
            Iterator<AvroFlumeEvent> it = list.iterator();
            while (it.hasNext()) {
                collect(it.next());
            }
            return Status.OK;
        }

        private void collect(AvroFlumeEvent avroFlumeEvent) {
            Object deserialize = FlumeSource.this.schema.deserialize(avroFlumeEvent.getBody().array());
            if (!FlumeSource.this.schema.isEndOfStream(deserialize)) {
                this.collector.collect(deserialize);
                return;
            }
            FlumeSource.this.finished = true;
            stop();
            FlumeSource.this.notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlumeSource(String str, int i, DeserializationSchema<OUT> deserializationSchema) {
        super(deserializationSchema);
        this.finished = false;
        this.host = str;
        this.port = Integer.toString(i);
    }

    public void configureAvroSource(Collector<OUT> collector) {
        this.avroSource = new MyAvroSource();
        this.avroSource.collector = collector;
        Context context = new Context();
        context.put("port", this.port);
        context.put("bind", this.host);
        this.avroSource.configure(context);
        this.avroSource.setChannelProcessor(new ChannelProcessor((ChannelSelector) null));
    }

    public void invoke(Collector<OUT> collector) throws Exception {
        configureAvroSource(collector);
        this.avroSource.start();
        while (!this.finished) {
            wait();
        }
    }
}
