/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

class ReducingUpsertWriter<WriterState>
implements SinkWriter<RowData, Void, WriterState> {
    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
    private final WrappedContext wrappedContext = new WrappedContext();
    private final int batchMaxRowNums;
    private final Function<RowData, RowData> valueCopyFunction;
    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new HashMap<RowData, Tuple2<RowData, Long>>();
    private final Function<RowData, RowData> keyExtractor;
    private final Sink.ProcessingTimeService timeService;
    private final long batchIntervalMs;
    private boolean closed = false;
    private long lastFlush = System.currentTimeMillis();

    ReducingUpsertWriter(SinkWriter<RowData, ?, WriterState> wrappedWriter, DataType physicalDataType, int[] keyProjection, SinkBufferFlushMode bufferFlushMode, Sink.ProcessingTimeService timeService, Function<RowData, RowData> valueCopyFunction) {
        Preconditions.checkArgument((bufferFlushMode != null && bufferFlushMode.isEnabled() ? 1 : 0) != 0);
        this.wrappedWriter = (SinkWriter)Preconditions.checkNotNull(wrappedWriter);
        this.timeService = (Sink.ProcessingTimeService)Preconditions.checkNotNull((Object)timeService);
        this.batchMaxRowNums = bufferFlushMode.getBatchSize();
        this.batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
        this.registerFlush();
        List fields = physicalDataType.getLogicalType().getChildren();
        RowData.FieldGetter[] keyFieldGetters = (RowData.FieldGetter[])Arrays.stream(keyProjection).mapToObj(targetField -> RowData.createFieldGetter((LogicalType)((LogicalType)fields.get(targetField)), (int)targetField)).toArray(RowData.FieldGetter[]::new);
        this.keyExtractor = rowData -> DynamicKafkaRecordSerializationSchema.createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters);
        this.valueCopyFunction = valueCopyFunction;
    }

    public void write(RowData element, SinkWriter.Context context) throws IOException, InterruptedException {
        this.wrappedContext.setContext(context);
        this.addToBuffer(element, context.timestamp());
    }

    public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException {
        this.flush();
        return Collections.emptyList();
    }

    public List<WriterState> snapshotState(long checkpointId) throws IOException {
        return this.wrappedWriter.snapshotState(checkpointId);
    }

    public void close() throws Exception {
        if (!this.closed) {
            this.closed = true;
            this.wrappedWriter.close();
        }
    }

    private void addToBuffer(RowData row, Long timestamp) throws IOException, InterruptedException {
        RowData key = this.keyExtractor.apply(row);
        RowData value = this.valueCopyFunction.apply(row);
        this.reduceBuffer.put(key, (Tuple2<RowData, Long>)new Tuple2((Object)this.changeFlag(value), (Object)timestamp));
        if (this.reduceBuffer.size() >= this.batchMaxRowNums) {
            this.flush();
        }
    }

    private void registerFlush() {
        if (this.closed) {
            return;
        }
        this.timeService.registerProcessingTimer(this.lastFlush + this.batchIntervalMs, t -> {
            if (t >= this.lastFlush + this.batchIntervalMs) {
                this.flush();
            }
            this.registerFlush();
        });
    }

    private RowData changeFlag(RowData value) {
        switch (value.getRowKind()) {
            case INSERT: 
            case UPDATE_AFTER: {
                value.setRowKind(RowKind.UPDATE_AFTER);
                break;
            }
            case UPDATE_BEFORE: 
            case DELETE: {
                value.setRowKind(RowKind.DELETE);
            }
        }
        return value;
    }

    private void flush() throws IOException, InterruptedException {
        for (Tuple2<RowData, Long> value : this.reduceBuffer.values()) {
            this.wrappedContext.setTimestamp((Long)value.f1);
            this.wrappedWriter.write(value.f0, (SinkWriter.Context)this.wrappedContext);
        }
        this.lastFlush = System.currentTimeMillis();
        this.reduceBuffer.clear();
    }

    private static class WrappedContext
    implements SinkWriter.Context {
        private long timestamp;
        private SinkWriter.Context context;

        private WrappedContext() {
        }

        public long currentWatermark() {
            Preconditions.checkNotNull((Object)this.context, (String)"context must be set before retrieving it.");
            return this.context.currentWatermark();
        }

        public Long timestamp() {
            Preconditions.checkNotNull((Object)this.timestamp, (String)"timestamp must to be set before retrieving it.");
            return this.timestamp;
        }

        public void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }

        public void setContext(SinkWriter.Context context) {
            this.context = context;
        }
    }
}

