/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedUpsertSinkFunction
extends RichSinkFunction<RowData>
implements CheckpointedFunction,
CheckpointListener {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(BufferedUpsertSinkFunction.class);
    private final RichSinkFunction<RowData> producer;
    private final int batchMaxRowNums;
    private final long batchIntervalMs;
    private final DataType physicalDataType;
    private final int[] keyProjection;
    private final TypeInformation<RowData> consumedRowDataTypeInfo;
    private boolean closed;
    private int batchCount = 0;
    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
    private transient WrappedContext wrappedContext;
    private transient Function<RowData, RowData> keyExtractor;
    private transient Function<RowData, RowData> valueCopier;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;

    public BufferedUpsertSinkFunction(RichSinkFunction<RowData> producer, DataType physicalDataType, int[] keyProjection, TypeInformation<RowData> consumedRowDataTypeInfo, SinkBufferFlushMode bufferFlushMode) {
        Preconditions.checkArgument((bufferFlushMode != null && bufferFlushMode.isEnabled() ? 1 : 0) != 0);
        this.producer = (RichSinkFunction)Preconditions.checkNotNull(producer, (String)"Producer must not be null.");
        this.physicalDataType = (DataType)Preconditions.checkNotNull((Object)physicalDataType, (String)"Physical data type must not be null.");
        this.keyProjection = (int[])Preconditions.checkNotNull((Object)keyProjection, (String)"key projection must not be null.");
        this.consumedRowDataTypeInfo = consumedRowDataTypeInfo;
        this.batchMaxRowNums = bufferFlushMode.getBatchSize();
        this.batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
    }

    public void open(Configuration parameters) throws Exception {
        this.reduceBuffer = new HashMap<RowData, Tuple2<RowData, Long>>();
        this.wrappedContext = new WrappedContext();
        this.closed = false;
        List fields = this.physicalDataType.getLogicalType().getChildren();
        RowData.FieldGetter[] keyFieldGetters = (RowData.FieldGetter[])Arrays.stream(this.keyProjection).mapToObj(targetField -> RowData.createFieldGetter((LogicalType)((LogicalType)fields.get(targetField)), (int)targetField)).toArray(RowData.FieldGetter[]::new);
        this.keyExtractor = rowData -> DynamicKafkaSerializationSchema.createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters);
        TypeSerializer typeSerializer = this.consumedRowDataTypeInfo.createSerializer(this.getRuntimeContext().getExecutionConfig());
        this.valueCopier = this.getRuntimeContext().getExecutionConfig().isObjectReuseEnabled() ? arg_0 -> ((TypeSerializer)typeSerializer).copy(arg_0) : Function.identity();
        this.scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("upsert-kafka-sink-function"));
        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
            BufferedUpsertSinkFunction bufferedUpsertSinkFunction = this;
            synchronized (bufferedUpsertSinkFunction) {
                if (!this.closed) {
                    try {
                        this.flush();
                    }
                    catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, this.batchIntervalMs, this.batchIntervalMs, TimeUnit.MILLISECONDS);
        this.producer.open(parameters);
    }

    public void setRuntimeContext(RuntimeContext t) {
        this.producer.setRuntimeContext(t);
    }

    public RuntimeContext getRuntimeContext() {
        return this.producer.getRuntimeContext();
    }

    public void invoke(RowData value, SinkFunction.Context context) throws Exception {
        this.wrappedContext.setContext(context);
        this.addToBuffer(value, context.timestamp());
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.producer instanceof CheckpointListener) {
            ((CheckpointListener)this.producer).notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.producer instanceof CheckpointListener) {
            ((CheckpointListener)this.producer).notifyCheckpointAborted(checkpointId);
        }
    }

    public synchronized void close() throws Exception {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.batchCount > 0) {
                try {
                    this.flush();
                }
                catch (Exception e) {
                    LOG.warn("Writing records to kafka failed.", (Throwable)e);
                    throw new RuntimeException("Writing records to kafka failed.", e);
                }
            }
            this.producer.close();
        }
        super.close();
        this.checkFlushException();
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.flush();
        if (this.producer instanceof CheckpointedFunction) {
            ((CheckpointedFunction)this.producer).snapshotState(context);
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        if (this.producer instanceof CheckpointedFunction) {
            ((CheckpointedFunction)this.producer).initializeState(context);
        }
    }

    private synchronized void addToBuffer(RowData row, Long timestamp) throws Exception {
        this.checkFlushException();
        RowData key = this.keyExtractor.apply(row);
        RowData value = this.valueCopier.apply(row);
        this.reduceBuffer.put(key, (Tuple2<RowData, Long>)new Tuple2((Object)this.changeFlag(value), (Object)timestamp));
        ++this.batchCount;
        if (this.batchCount >= this.batchMaxRowNums) {
            this.flush();
        }
    }

    private synchronized void flush() throws Exception {
        this.checkFlushException();
        for (Tuple2<RowData, Long> value : this.reduceBuffer.values()) {
            this.wrappedContext.setTimestamp((Long)value.f1);
            this.producer.invoke(value.f0, (SinkFunction.Context)this.wrappedContext);
        }
        this.reduceBuffer.clear();
        this.batchCount = 0;
    }

    private RowData changeFlag(RowData value) {
        switch (value.getRowKind()) {
            case INSERT: 
            case UPDATE_AFTER: {
                value.setRowKind(RowKind.UPDATE_AFTER);
                break;
            }
            case UPDATE_BEFORE: 
            case DELETE: {
                value.setRowKind(RowKind.DELETE);
            }
        }
        return value;
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to JDBC failed.", this.flushException);
        }
    }

    private static class WrappedContext
    implements SinkFunction.Context {
        private Long timestamp;
        private SinkFunction.Context context;

        private WrappedContext() {
        }

        void setTimestamp(Long timestamp) {
            this.timestamp = timestamp;
        }

        void setContext(SinkFunction.Context context) {
            this.context = context;
        }

        public long currentProcessingTime() {
            return this.context.currentProcessingTime();
        }

        public long currentWatermark() {
            return this.context.currentWatermark();
        }

        public Long timestamp() {
            return this.timestamp;
        }
    }
}

