/*
 * Decompiled with CFR 0.152.
 */
package cn.tenmg.flink.jobs.service;

import cn.tenmg.flink.jobs.StreamService;
import cn.tenmg.flink.jobs.model.Params;
import cn.tenmg.flink.jobs.model.TableOperate;
import cn.tenmg.flink.jobs.serialization.TableOperateDeserializationSchema;
import cn.tenmg.flink.jobs.service.AfterTableChangeProcess;
import cn.tenmg.flink.jobs.utils.TableUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public abstract class TableChangeService
implements StreamService {
    private static final long serialVersionUID = 967105095937872674L;
    private static final Set<String> OPERATIONS = new HashSet<String>(){
        private static final long serialVersionUID = -786215989755170736L;
        {
            this.add("INSERT");
            this.add("UPDATE");
            this.add("DELETE");
            this.add("insert");
            this.add("update");
            this.add("delete");
        }
    };

    protected abstract String getSubscribe();

    protected abstract Properties getKafkaProperties();

    protected abstract Map<KafkaTopicPartition, Long> getStartFromSpecificOffsets();

    protected abstract Set<String> getAssociatedTables();

    protected abstract DataStream<TableOperate> getBatchDataStream(StreamExecutionEnvironment var1, Params var2);

    protected abstract void process(String var1, List<TableOperate> var2);

    protected abstract int getMaxTry();

    protected abstract AfterTableChangeProcess getAfterTableChangeProcess();

    @Override
    public void run(StreamExecutionEnvironment env, Params params) throws Exception {
        ProcessFunction processFunction;
        SingleOutputStreamOperator stream;
        if (RuntimeExecutionMode.BATCH.equals((Object)params.getRuntimeMode())) {
            stream = this.getBatchDataStream(env, params);
            processFunction = new BatchProcessFunction(this, this.getMaxTry());
        } else {
            Properties kafkaProperties = this.getKafkaProperties();
            kafkaProperties.setProperty("group.id", kafkaProperties.getProperty("group.id.prefix", "flink-jobs").concat("_").concat(params.getServiceName()));
            kafkaProperties.remove("group.id.prefix");
            FlinkKafkaConsumer011 flinkKafkaConsumer = new FlinkKafkaConsumer011(Arrays.asList(this.getSubscribe().split(",")), (KafkaDeserializationSchema)new TableOperateDeserializationSchema(), kafkaProperties);
            flinkKafkaConsumer.setStartFromSpecificOffsets(this.getStartFromSpecificOffsets());
            stream = env.addSource((SourceFunction)flinkKafkaConsumer).filter((FilterFunction)new TableOperateFilter(this.getAssociatedTables()));
            processFunction = new StreamProcessFunction(this, this.getMaxTry(), this.getAfterTableChangeProcess());
        }
        SingleOutputStreamOperator operator = stream.keyBy((KeySelector & Serializable)value -> TableUtils.fullTableName(value.getDatabase(), value.getTable()).toUpperCase()).window((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.milliseconds((long)500L))).aggregate((AggregateFunction)new AggregateTableOperate());
        operator.process(processFunction);
    }

    protected static Consumer<Long, String> patitionsGetterConsumer(Properties properties) {
        Properties props = new Properties();
        props.put("bootstrap.servers", properties.get("bootstrap.servers"));
        props.put("group.id", properties.getProperty("group.id.prefix", "flink-jobs").concat("_partition_getter"));
        props.put("key.deserializer", LongDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        return new KafkaConsumer(props);
    }

    public static class AggregateTableOperate
    implements AggregateFunction<TableOperate, Tuple2<String, List<TableOperate>>, Tuple2<String, List<TableOperate>>> {
        private static final long serialVersionUID = 2762779066918191685L;

        public Tuple2<String, List<TableOperate>> createAccumulator() {
            return new Tuple2();
        }

        public Tuple2<String, List<TableOperate>> add(TableOperate value, Tuple2<String, List<TableOperate>> accumulator) {
            List list;
            if (accumulator.f0 == null) {
                accumulator.f0 = TableUtils.fullTableName(value.getDatabase(), value.getTable()).toUpperCase();
            }
            if ((list = (List)accumulator.f1) == null) {
                accumulator.f1 = new ArrayList();
                list = accumulator.f1;
            }
            list.add(value);
            return accumulator;
        }

        public Tuple2<String, List<TableOperate>> getResult(Tuple2<String, List<TableOperate>> accumulator) {
            return accumulator;
        }

        public Tuple2<String, List<TableOperate>> merge(Tuple2<String, List<TableOperate>> a, Tuple2<String, List<TableOperate>> b) {
            ((List)a.f1).addAll((Collection)b.f1);
            return a;
        }
    }

    public static class TableOperateFilter
    implements FilterFunction<TableOperate> {
        private static final long serialVersionUID = -6146916944202432219L;
        private final Set<String> retainedTables;

        public TableOperateFilter(Set<String> retainedTables) {
            this.retainedTables = retainedTables;
        }

        public boolean filter(TableOperate value) throws Exception {
            return this.retainedTables.contains(TableUtils.fullTableName(value.getDatabase(), value.getTable()).toUpperCase()) && OPERATIONS.contains(value.getType());
        }
    }

    public static class BatchProcessFunction
    extends ProcessFunction<Tuple2<String, List<TableOperate>>, List<TableOperate>> {
        private static final long serialVersionUID = -8995755672685262056L;
        private final TableChangeService tableChangeHandler;
        private final int maxTry;

        public BatchProcessFunction(TableChangeService tableChangeHandler, int maxTry) {
            this.tableChangeHandler = tableChangeHandler;
            this.maxTry = maxTry;
        }

        public void processElement(Tuple2<String, List<TableOperate>> value, ProcessFunction.Context ctx, Collector<List<TableOperate>> arg2) throws Exception {
            String fullTableName = (String)value.f0;
            List tableOperates = (List)value.f1;
            for (int i = 0; i < this.maxTry; ++i) {
                try {
                    this.tableChangeHandler.process(fullTableName, tableOperates);
                    break;
                }
                catch (Exception e) {
                    e.printStackTrace();
                    continue;
                }
            }
        }
    }

    public static class StreamProcessFunction
    extends ProcessFunction<Tuple2<String, List<TableOperate>>, List<TableOperate>> {
        private static final long serialVersionUID = -8218678158529587523L;
        private final TableChangeService tableChangeHandler;
        private final int maxTry;
        private final AfterTableChangeProcess afterTableChangeProcess;

        public StreamProcessFunction(TableChangeService tableChangeHandler, int maxTry, AfterTableChangeProcess afterTableChangeProcess) {
            this.tableChangeHandler = tableChangeHandler;
            this.maxTry = maxTry;
            this.afterTableChangeProcess = afterTableChangeProcess;
        }

        public void processElement(Tuple2<String, List<TableOperate>> value, ProcessFunction.Context ctx, Collector<List<TableOperate>> arg2) throws Exception {
            String fullTableName = (String)value.f0;
            String serviceName = this.tableChangeHandler.getClass().getSimpleName();
            List tableOperates = (List)value.f1;
            boolean success = false;
            for (int i = 0; i < this.maxTry; ++i) {
                String msg = null;
                Date startTime = Calendar.getInstance().getTime();
                try {
                    this.tableChangeHandler.process(fullTableName, tableOperates);
                    success = true;
                }
                catch (Exception e) {
                    e.printStackTrace();
                    msg = e.getMessage();
                }
                this.afterTableChangeProcess.log(tableOperates, serviceName, startTime, Calendar.getInstance().getTime(), success, msg);
                if (!success) continue;
                this.afterTableChangeProcess.saveStartingOffsets((TableOperate)tableOperates.get(tableOperates.size() - 1), serviceName);
                break;
            }
        }
    }
}

