package org.apache.pinot.tools.streams;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/streams/AirlineDataStream.class */
public class AirlineDataStream {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AirlineDataStream.class);
    Schema pinotSchema;
    File avroFile;
    DataFileStream<GenericRecord> avroDataStream;
    ExecutorService service;
    private StreamDataProducer producer;
    Integer currentTimeValue = 16102;
    boolean keepIndexing = true;
    int counter = 0;

    public AirlineDataStream(Schema schema, File file) throws Exception {
        this.pinotSchema = schema;
        this.avroFile = file;
        createStream();
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS);
        this.producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
        this.service = Executors.newFixedThreadPool(1);
        Quickstart.printStatus(Quickstart.Color.YELLOW, "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 60 events (which is approximately 60 seconds) *****");
    }

    public void shutdown() {
        this.keepIndexing = false;
        this.avroDataStream = null;
        this.producer.close();
        this.producer = null;
        this.service.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createStream() throws IOException {
        if (this.keepIndexing) {
            this.avroDataStream = new DataFileStream<>(new FileInputStream(this.avroFile), new GenericDatumReader());
        } else {
            this.avroDataStream = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publish(GenericRecord genericRecord) throws IOException {
        if (this.keepIndexing) {
            this.producer.produce("flights-realtime", genericRecord.toString().getBytes("UTF-8"));
        } else {
            this.avroDataStream.close();
            this.avroDataStream = null;
        }
    }

    public void run() {
        this.service.submit(new Runnable() { // from class: org.apache.pinot.tools.streams.AirlineDataStream.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    if (!AirlineDataStream.this.avroDataStream.hasNext()) {
                        try {
                            AirlineDataStream.this.avroDataStream.close();
                        } catch (IOException e) {
                            AirlineDataStream.logger.error(e.getMessage());
                        }
                        try {
                            AirlineDataStream.this.createStream();
                        } catch (IOException e2) {
                            AirlineDataStream.logger.error(e2.getMessage());
                        }
                    } else {
                        if (!AirlineDataStream.this.keepIndexing) {
                            return;
                        }
                        GenericRecord genericRecord = (GenericRecord) AirlineDataStream.this.avroDataStream.next();
                        GenericData.Record record = new GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(AirlineDataStream.this.pinotSchema));
                        for (DimensionFieldSpec dimensionFieldSpec : AirlineDataStream.this.pinotSchema.getDimensionFieldSpecs()) {
                            record.put(dimensionFieldSpec.getName(), genericRecord.get(dimensionFieldSpec.getName()));
                        }
                        for (DimensionFieldSpec dimensionFieldSpec2 : AirlineDataStream.this.pinotSchema.getDimensionFieldSpecs()) {
                            record.put(dimensionFieldSpec2.getName(), genericRecord.get(dimensionFieldSpec2.getName()));
                        }
                        record.put(AirlineDataStream.this.pinotSchema.getTimeFieldSpec().getIncomingTimeColumnName(), AirlineDataStream.this.currentTimeValue);
                        try {
                            AirlineDataStream.this.publish(record);
                            AirlineDataStream.this.counter++;
                            if (AirlineDataStream.this.counter % 60 == 0) {
                                AirlineDataStream.this.currentTimeValue = Integer.valueOf(AirlineDataStream.this.currentTimeValue.intValue() + 1);
                            }
                            Thread.sleep(1000L);
                        } catch (Exception e3) {
                            AirlineDataStream.logger.error(e3.getMessage());
                        }
                    }
                }
            }
        });
    }
}
