/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.io.debezium;

import io.debezium.connector.mysql.MySqlConnector;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import org.apache.beam.io.debezium.KafkaSourceConsumerFn;
import org.apache.beam.io.debezium.SourceRecordJson;
import org.apache.beam.io.debezium.SourceRecordMapper;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class OffsetTrackerTest
implements Serializable {
    @Test
    public void testRestrictByNumberOfRecords() throws IOException {
        Integer maxNumRecords = 10;
        HashMap position = new HashMap();
        KafkaSourceConsumerFn kafkaSourceConsumerFn = new KafkaSourceConsumerFn(MySqlConnector.class, (SourceRecordMapper)new SourceRecordJson.SourceRecordJsonMapper(), maxNumRecords);
        KafkaSourceConsumerFn.OffsetHolder restriction = kafkaSourceConsumerFn.getInitialRestriction(new HashMap());
        KafkaSourceConsumerFn.OffsetTracker tracker = new KafkaSourceConsumerFn.OffsetTracker(restriction);
        for (int records = 0; records < maxNumRecords; ++records) {
            Assert.assertTrue((String)"OffsetTracker should continue", (boolean)tracker.tryClaim(position));
        }
        Assert.assertFalse((String)"OffsetTracker should stop", (boolean)tracker.tryClaim(position));
    }

    @Test
    public void testRestrictByAmountOfTime() throws IOException, InterruptedException {
        long millis = 60000L;
        long minutesToRun = 1L;
        HashMap position = new HashMap();
        KafkaSourceConsumerFn kafkaSourceConsumerFn = new KafkaSourceConsumerFn(MySqlConnector.class, (SourceRecordMapper)new SourceRecordJson.SourceRecordJsonMapper(), minutesToRun);
        KafkaSourceConsumerFn.OffsetHolder restriction = kafkaSourceConsumerFn.getInitialRestriction(new HashMap());
        KafkaSourceConsumerFn.OffsetTracker tracker = new KafkaSourceConsumerFn.OffsetTracker(restriction);
        Assert.assertTrue((boolean)tracker.tryClaim(position));
        Thread.sleep(minutesToRun * millis + 100L);
        Assert.assertFalse((boolean)tracker.tryClaim(position));
    }
}

