/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class TestingSinks {

    public static class RowCollector {
        private static final List<Tuple2<Boolean, Row>> SINK = new ArrayList<Tuple2<Boolean, Row>>();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void addValue(Tuple2<Boolean, Row> value) {
            Tuple2 copy = new Tuple2(value.f0, value.f1);
            List<Tuple2<Boolean, Row>> list = SINK;
            synchronized (list) {
                SINK.add((Tuple2<Boolean, Row>)copy);
            }
        }

        public static List<Tuple2<Boolean, Row>> getAndClearValues() {
            ArrayList<Tuple2<Boolean, Row>> out = new ArrayList<Tuple2<Boolean, Row>>(SINK);
            SINK.clear();
            return out;
        }

        public static List<String> retractResults(List<Tuple2<Boolean, Row>> results) {
            HashMap<String, Integer> retractedResult = new HashMap<String, Integer>();
            results.forEach(v -> {
                int cnt = retractedResult.getOrDefault(((Row)v.f1).toString(), 0);
                if (((Boolean)v.f0).booleanValue()) {
                    retractedResult.put(((Row)v.f1).toString(), cnt + 1);
                } else {
                    retractedResult.put(((Row)v.f1).toString(), cnt - 1);
                }
            });
            if (retractedResult.entrySet().stream().allMatch(entry -> (Integer)entry.getValue() < 0)) {
                throw new AssertionError((Object)"Received retracted rows which have not been accumulated.");
            }
            ArrayList<String> retractedString = new ArrayList<String>();
            retractedResult.forEach((k, v) -> {
                for (int i = 0; i < v; ++i) {
                    retractedString.add((String)k);
                }
            });
            return retractedString;
        }

        public static List<String> upsertResults(List<Tuple2<Boolean, Row>> results, int[] keys) {
            HashMap upsertResult = new HashMap();
            results.forEach(v -> {
                Row key = Row.project((Row)((Row)v.f1), (int[])keys);
                if (((Boolean)v.f0).booleanValue()) {
                    upsertResult.put(key, ((Row)v.f1).toString());
                } else {
                    upsertResult.remove(key);
                }
            });
            return new ArrayList<String>(upsertResult.values());
        }
    }

    static class RowSink
    implements SinkFunction<RowData> {
        private final DynamicTableSink.DataStructureConverter converter;

        public RowSink(DynamicTableSink.DataStructureConverter converter) {
            this.converter = converter;
        }

        public void invoke(RowData value, SinkFunction.Context context) {
            RowKind rowKind = value.getRowKind();
            Row data = (Row)this.converter.toExternal((Object)value);
            if (rowKind == RowKind.INSERT || rowKind == RowKind.UPDATE_AFTER) {
                RowCollector.addValue((Tuple2<Boolean, Row>)Tuple2.of((Object)true, (Object)data));
            } else {
                RowCollector.addValue((Tuple2<Boolean, Row>)Tuple2.of((Object)false, (Object)data));
            }
        }
    }

    public static class TestAppendingSink
    implements DynamicTableSink {
        private final DataType rowDataType;

        public TestAppendingSink(DataType rowDataType) {
            this.rowDataType = rowDataType;
        }

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return requestedMode;
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            final DynamicTableSink.DataStructureConverter converter = context.createDataStructureConverter(this.rowDataType);
            return new DataStreamSinkProvider(){

                public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                    return dataStream.addSink((SinkFunction)new RowSink(converter));
                }
            };
        }

        public DynamicTableSink copy() {
            return new TestAppendingSink(this.rowDataType);
        }

        public String asSummaryString() {
            return String.format("TestingAppendSink(%s)", DataType.getFields((DataType)this.rowDataType));
        }
    }
}

