/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;
import org.junit.Assert;
import org.junit.Test;

public class SinkUpsertMaterializerTest {
    private final StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig((long)1000L);
    private final LogicalType[] types = new LogicalType[]{new IntType(), new VarCharType()};
    private final RowDataSerializer serializer = new RowDataSerializer(this.types);
    private final RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[0], this.types);
    private final GeneratedRecordEqualiser equaliser = new GeneratedRecordEqualiser("", "", new Object[0]){

        public RecordEqualiser newInstance(ClassLoader classLoader) {
            return new TestRecordEqualiser();
        }
    };

    @Test
    public void test() throws Exception {
        SinkUpsertMaterializer materializer = new SinkUpsertMaterializer(this.ttlConfig, (TypeSerializer)this.serializer, this.equaliser);
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)materializer, (KeySelector)this.keySelector, (TypeInformation)this.keySelector.getProducedType());
        testHarness.open();
        testHarness.setStateTtlProcessingTime(1L);
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a1"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a2"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1, "a2"));
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a3"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1, "a3"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(1, "a2"));
        this.shouldEmitNothing((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness);
        testHarness.processElement(StreamRecordUtils.deleteRecord(1, "a3"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.deleteRecord(1, "a1"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, 1, "a1"));
        testHarness.processElement(StreamRecordUtils.insertRecord(1, "a4"));
        this.shouldEmit((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1, "a4"));
        testHarness.setStateTtlProcessingTime(1002L);
        testHarness.processElement(StreamRecordUtils.deleteRecord(1, "a4"));
        this.shouldEmitNothing((OneInputStreamOperatorTestHarness<RowData, RowData>)testHarness);
        testHarness.close();
    }

    private void shouldEmitNothing(OneInputStreamOperatorTestHarness<RowData, RowData> harness) {
        Assert.assertEquals(Collections.emptyList(), SinkUpsertMaterializerTest.getEmittedRows(harness));
    }

    private void shouldEmit(OneInputStreamOperatorTestHarness<RowData, RowData> harness, RowData expected) {
        Assert.assertEquals(Collections.singletonList(expected), SinkUpsertMaterializerTest.getEmittedRows(harness));
    }

    private static List<RowData> getEmittedRows(OneInputStreamOperatorTestHarness<RowData, RowData> harness) {
        Object o;
        ArrayList<RowData> rows = new ArrayList<RowData>();
        while ((o = harness.getOutput().poll()) != null) {
            RowData value = (RowData)((StreamRecord)o).getValue();
            GenericRowData newRow = GenericRowData.of((Object[])new Object[]{value.getInt(0), value.getString(1)});
            newRow.setRowKind(value.getRowKind());
            rows.add((RowData)newRow);
        }
        return rows;
    }

    private static class TestRecordEqualiser
    implements RecordEqualiser {
        private TestRecordEqualiser() {
        }

        public boolean equals(RowData row1, RowData row2) {
            return row1.getRowKind() == row2.getRowKind() && row1.getInt(0) == row2.getInt(0) && row1.getString(1).equals(row2.getString(1));
        }
    }
}

