package org.apache.flink.connector.kinesis.table;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.factories.TableOptionsBuilder;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.class */
public class RowDataFieldsKinesisPartitionKeyGeneratorTest extends TestLogger {
    private static final String TABLE_NAME = "click_stream";
    private static final RowType ROW_TYPE = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("time", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("ip", DataTypes.VARCHAR(16)), DataTypes.FIELD("route", DataTypes.STRING()), DataTypes.FIELD("date", DataTypes.STRING()), DataTypes.FIELD("year", DataTypes.STRING()), DataTypes.FIELD("month", DataTypes.STRING()), DataTypes.FIELD("day", DataTypes.STRING())}).getLogicalType();
    private static final List<String> FIELD_DELIMITERS = Arrays.asList("", "|", ",", "--");
    private static final List<String> PARTITION_BY_DATE_AND_IP = Arrays.asList("date", "ip");
    private static final List<String> PARTITION_BY_DATE = Arrays.asList("year", "month", "day");
    private static final List<String> PARTITION_BY_ROUTE = Collections.singletonList("route");
    private static final List<LocalDateTime> DATE_TIMES = Arrays.asList(LocalDateTime.of(2014, 10, 22, 12, 0), LocalDateTime.of(2015, 11, 13, 10, 0), LocalDateTime.of(2015, 12, 14, 14, 0), LocalDateTime.of(2018, 10, 31, 15, 0));
    private static final String IP = "255.255.255.255";

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Test
    public void testGoodPartitioner() {
        for (String str : FIELD_DELIMITERS) {
            RowDataFieldsKinesisPartitionKeyGenerator rowDataFieldsKinesisPartitionKeyGenerator = new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, PARTITION_BY_DATE_AND_IP, str);
            for (LocalDateTime localDateTime : DATE_TIMES) {
                Assert.assertEquals(String.join(str, String.valueOf(days(localDateTime)), IP), rowDataFieldsKinesisPartitionKeyGenerator.apply(createElement(localDateTime, IP)));
            }
        }
    }

    @Test
    public void testGoodPartitionerExceedingMaxLength() {
        RowDataFieldsKinesisPartitionKeyGenerator rowDataFieldsKinesisPartitionKeyGenerator = new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, PARTITION_BY_ROUTE);
        String str = "http://www.very-" + EncodingUtils.repeat("long-", 50) + "address.com/home";
        String substring = str.substring(0, 256);
        Iterator<LocalDateTime> it = DATE_TIMES.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(substring, rowDataFieldsKinesisPartitionKeyGenerator.apply(createElement(it.next(), IP, str)));
        }
    }

    @Test
    public void testGoodPartitionerWithStaticPrefix() {
        final String valueOf = String.valueOf(year(DATE_TIMES.get(0)));
        final String valueOf2 = String.valueOf(monthOfYear(DATE_TIMES.get(0)));
        for (String str : FIELD_DELIMITERS) {
            RowDataFieldsKinesisPartitionKeyGenerator rowDataFieldsKinesisPartitionKeyGenerator = new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, PARTITION_BY_DATE, str);
            rowDataFieldsKinesisPartitionKeyGenerator.setStaticFields(new HashMap<String, String>() { // from class: org.apache.flink.connector.kinesis.table.RowDataFieldsKinesisPartitionKeyGeneratorTest.1
                {
                    put("year", valueOf);
                    put("month", valueOf2);
                }
            });
            for (LocalDateTime localDateTime : DATE_TIMES) {
                Assert.assertEquals(String.join(str, valueOf, valueOf2, String.valueOf(dayOfMonth(localDateTime))), rowDataFieldsKinesisPartitionKeyGenerator.apply(createElement(localDateTime, IP)));
            }
        }
    }

    @Test
    public void testGoodPartitionerWithStaticSuffix() {
        final String valueOf = String.valueOf(monthOfYear(DATE_TIMES.get(0)));
        final String valueOf2 = String.valueOf(dayOfMonth(DATE_TIMES.get(0)));
        for (String str : FIELD_DELIMITERS) {
            RowDataFieldsKinesisPartitionKeyGenerator rowDataFieldsKinesisPartitionKeyGenerator = new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, PARTITION_BY_DATE, str);
            rowDataFieldsKinesisPartitionKeyGenerator.setStaticFields(new HashMap<String, String>() { // from class: org.apache.flink.connector.kinesis.table.RowDataFieldsKinesisPartitionKeyGeneratorTest.2
                {
                    put("month", valueOf);
                    put("day", valueOf2);
                }
            });
            for (LocalDateTime localDateTime : DATE_TIMES) {
                Assert.assertEquals(String.join(str, String.valueOf(year(localDateTime)), valueOf, valueOf2), rowDataFieldsKinesisPartitionKeyGenerator.apply(createElement(localDateTime, IP)));
            }
        }
    }

    @Test
    public void testGoodPartitionerWithStaticInfix() {
        final String valueOf = String.valueOf(monthOfYear(DATE_TIMES.get(0)));
        for (String str : FIELD_DELIMITERS) {
            RowDataFieldsKinesisPartitionKeyGenerator rowDataFieldsKinesisPartitionKeyGenerator = new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, PARTITION_BY_DATE, str);
            rowDataFieldsKinesisPartitionKeyGenerator.setStaticFields(new HashMap<String, String>() { // from class: org.apache.flink.connector.kinesis.table.RowDataFieldsKinesisPartitionKeyGeneratorTest.3
                {
                    put("month", valueOf);
                }
            });
            for (LocalDateTime localDateTime : DATE_TIMES) {
                Assert.assertEquals(String.join(str, String.valueOf(year(localDateTime)), valueOf, String.valueOf(dayOfMonth(localDateTime))), rowDataFieldsKinesisPartitionKeyGenerator.apply(createElement(localDateTime, IP)));
            }
        }
    }

    @Test
    public void testBadPartitionerWithEmptyPrefix() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expect(FlinkMatchers.containsCause(new IllegalArgumentException("Cannot create a RowDataFieldsKinesisPartitioner for a non-partitioned table")));
        new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, Collections.emptyList());
    }

    @Test
    public void testBadPartitionerWithDuplicatePartitionKeys() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expect(FlinkMatchers.containsCause(new IllegalArgumentException("The sequence of partition keys cannot contain duplicates")));
        new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, Arrays.asList("ip", "ip"));
    }

    @Test
    public void testBadPartitionerWithBadFieldFieldNames() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expect(FlinkMatchers.containsCause(new IllegalArgumentException("The following partition keys are not present in the table: abc")));
        new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, Arrays.asList("ip", "abc"));
    }

    @Test
    public void testBadPartitionerWithBadFieldFieldTypes() {
        this.thrown.expect(IllegalArgumentException.class);
        this.thrown.expect(FlinkMatchers.containsCause(new IllegalArgumentException("The following partition keys have types that are not supported by Kinesis: time")));
        new RowDataFieldsKinesisPartitionKeyGenerator(ROW_TYPE, Arrays.asList("time", "ip"));
    }

    private RowData createElement(LocalDateTime localDateTime, String str) {
        return createElement(localDateTime, str, "https://flink.apache.org/home");
    }

    private RowData createElement(LocalDateTime localDateTime, String str, String str2) {
        GenericRowData genericRowData = new GenericRowData(ROW_TYPE.getFieldCount());
        genericRowData.setField(0, TimestampData.fromLocalDateTime(localDateTime));
        genericRowData.setField(1, StringData.fromString(str));
        genericRowData.setField(2, StringData.fromString(str2));
        genericRowData.setField(3, StringData.fromString(String.valueOf(days(localDateTime))));
        genericRowData.setField(4, StringData.fromString(String.valueOf(year(localDateTime))));
        genericRowData.setField(5, StringData.fromString(String.valueOf(monthOfYear(localDateTime))));
        genericRowData.setField(6, StringData.fromString(String.valueOf(dayOfMonth(localDateTime))));
        return genericRowData;
    }

    private int days(LocalDateTime localDateTime) {
        return (int) ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0L), localDateTime);
    }

    private int year(LocalDateTime localDateTime) {
        return localDateTime.get(ChronoField.YEAR);
    }

    private int monthOfYear(LocalDateTime localDateTime) {
        return localDateTime.get(ChronoField.MONTH_OF_YEAR);
    }

    private int dayOfMonth(LocalDateTime localDateTime) {
        return localDateTime.get(ChronoField.DAY_OF_MONTH);
    }

    private TableOptionsBuilder defaultTableOptions() {
        return new TableOptionsBuilder("kinesis", "test-format").withTableOption(KinesisConnectorOptions.STREAM, TABLE_NAME).withTableOption("properties.aws.region", "us-west-2").withFormatOption(TestFormatFactory.DELIMITER, ",");
    }
}
