/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.flink.connector.kafka.connector;

import java.util.ArrayList;
import java.util.List;
import net.wicp.tams.common.Result;
import net.wicp.tams.common.apiext.LoggerUtil;
import net.wicp.tams.common.apiext.TimeAssist;
import net.wicp.tams.common.binlog.alone.binlog.listener.IConsumerListener;
import net.wicp.tams.common.constant.JvmStatus;
import net.wicp.tams.common.exception.ExceptAll;
import net.wicp.tams.common.exception.IExcept;
import net.wicp.tams.common.exception.ProjectExceptionRuntime;
import net.wicp.tams.common.kafka.IConsumer;
import net.wicp.tams.common.kafka.KafkaConsumerGroupB;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumerListerWrap {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerListerWrap.class);
    private final KafkaConsumerGroupB kafkaConsumerGroupB;

    public KafkaConsumerListerWrap(String groupid, String topicName, final DeserializationSchema<RowData> deserializationSchema, final SourceFunction.SourceContext<RowData> ctx) {
        this.kafkaConsumerGroupB = new KafkaConsumerGroupB(false, groupid, topicName, (IConsumer)new IConsumer<byte[]>(){

            public Result doWithRecords(List<ConsumerRecord<String, byte[]>> consumerRecords) {
                RecordCollector out = new RecordCollector();
                for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
                    try {
                        deserializationSchema.deserialize((byte[])consumerRecord.value(), (Collector)out);
                    }
                    catch (Exception e) {
                        throw new ProjectExceptionRuntime((IExcept)ExceptAll.flink_source_deserialize, "kafka\u6570\u636edeserialize\u4e3arowData\u5931\u8d25");
                    }
                    log.info("offset:" + consumerRecord.offset());
                }
                for (RowData rowData : out.getRecords()) {
                    ctx.collect((Object)rowData);
                }
                return Result.getSuc();
            }

            public void doInit(IConsumerListener plugin) {
            }
        }, 1);
    }

    public void start() {
        this.kafkaConsumerGroupB.start();
        while (true) {
            try {
                while (true) {
                    System.in.read();
                    TimeAssist.reDoWaitInit((String)"need-init");
                }
            }
            catch (Throwable e) {
                boolean reDoWait = TimeAssist.reDoWait((String)"need-init", (int)8);
                log.error("readdo error", e);
                if (!reDoWait) continue;
                log.error("\u5df2\u91cd\u8bd58\u6b21\uff0c\u9000\u51fa\u7cfb\u7edf");
                LoggerUtil.exit((JvmStatus)JvmStatus.s15);
                continue;
            }
            break;
        }
    }

    private class RecordCollector
    implements Collector<RowData> {
        private final List<RowData> records = new ArrayList<RowData>();

        private RecordCollector() {
        }

        public List<RowData> getRecords() {
            return this.records;
        }

        public void collect(RowData record) {
            this.records.add(record);
        }

        public void close() {
        }
    }
}

