package io.confluent.connect.jdbc.sink.integration;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.jdbc.integration.BaseConnectorIT;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Calendar;
import java.util.Collections;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.testcontainers.containers.PostgreSQLContainer;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/connect/jdbc/sink/integration/PostgresPartitionedIT.class */
public class PostgresPartitionedIT extends BaseConnectorIT {

    @Rule
    public PostgreSQLContainer<?> postgresql = new PostgreSQLContainer<>("postgres:14-bullseye");
    Connection connection;
    private JsonConverter jsonConverter;
    private Map<String, String> props;
    private static final String KEY = "key";
    private static final String MEASUREMENTS = "measurements";

    @Before
    public void setup() throws SQLException {
        startConnect();
        this.jsonConverter = jsonConverter();
        this.props = baseSinkProps();
        this.props.put("connection.url", this.postgresql.getJdbcUrl());
        this.props.put("connection.user", this.postgresql.getUsername());
        this.props.put("connection.password", this.postgresql.getPassword());
        this.props.put("pk.mode", "record_value");
        this.props.put("pk.fields", KEY);
        this.props.put("auto.create", "false");
        this.props.put("insert.mode", "insert");
        this.props.put("table.types", "PARTITIONED TABLE");
        this.props.put("topics", MEASUREMENTS);
        this.connect.kafka().createTopic(MEASUREMENTS, 1);
        createPartitionedTableAndPartition();
    }

    private void createPartitionedTableAndPartition() throws SQLException {
        this.connection = DriverManager.getConnection(this.postgresql.getJdbcUrl(), this.postgresql.getUsername(), this.postgresql.getPassword());
        Statement createStatement = this.connection.createStatement();
        Throwable th = null;
        try {
            createStatement.execute("CREATE TABLE measurements (    key             int not null,    city_id         int not null,    logdate         date not null,    peaktemp        int,    unitsales       int) PARTITION BY RANGE (logdate);");
            createStatement.execute("CREATE TABLE measurements_y1970    PARTITION OF measurements    FOR VALUES FROM ('1970-01-01') TO ('1971-01-01');");
            if (createStatement != null) {
                if (0 == 0) {
                    createStatement.close();
                    return;
                }
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th3;
        }
    }

    @After
    public void tearDown() throws SQLException {
        this.connection.close();
        stopConnect();
    }

    @Test
    public void shouldInsertMeasurement() throws Exception {
        Schema build = SchemaBuilder.struct().field(KEY, Schema.INT32_SCHEMA).field("city_id", Schema.INT32_SCHEMA).field("logdate", Date.SCHEMA).field("peaktemp", Schema.INT32_SCHEMA).field("unitsales", Schema.INT32_SCHEMA).build();
        Struct put = new Struct(build).put(KEY, 1).put("city_id", 2).put("logdate", new java.util.Date(0L)).put("peaktemp", 4).put("unitsales", 5);
        this.connect.configureConnector("jdbc-sink-connector", this.props);
        waitForConnectorToStart("jdbc-sink-connector", 1);
        produceRecord(build, put);
        waitForCommittedRecords("jdbc-sink-connector", Collections.singleton(MEASUREMENTS), 1L, 1, TimeUnit.MINUTES.toMillis(3L));
        assertSinkRecordValues(put);
    }

    private void assertSinkRecordValues(Struct struct) throws SQLException, Exception {
        Statement createStatement = this.connection.createStatement();
        Throwable th = null;
        try {
            ResultSet executeQuery = createStatement.executeQuery("SELECT * FROM measurements ORDER BY KEY DESC FETCH FIRST 1 ROWS ONLY");
            Throwable th2 = null;
            try {
                try {
                    Assert.assertTrue(executeQuery.next());
                    Assert.assertEquals(struct.getInt32(KEY).intValue(), executeQuery.getInt(1));
                    Assert.assertEquals(struct.getInt32("city_id").intValue(), executeQuery.getInt(2));
                    Assert.assertEquals(struct.get("logdate"), executeQuery.getDate(3, Calendar.getInstance(TimeZone.getTimeZone("UTC"))));
                    Assert.assertEquals(struct.getInt32("peaktemp").intValue(), executeQuery.getInt(4));
                    Assert.assertEquals(struct.getInt32("unitsales").intValue(), executeQuery.getInt(5));
                    if (executeQuery != null) {
                        if (0 != 0) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            executeQuery.close();
                        }
                    }
                    if (createStatement != null) {
                        if (0 == 0) {
                            createStatement.close();
                            return;
                        }
                        try {
                            createStatement.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (executeQuery != null) {
                    if (th2 != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th8;
        }
    }

    private void produceRecord(Schema schema, Struct struct) {
        this.connect.kafka().produce(MEASUREMENTS, (String) null, new String(this.jsonConverter.fromConnectData(MEASUREMENTS, schema, struct)));
    }
}
