/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.file;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.file.FileStreamSourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class FileStreamSourceTaskTest
extends EasyMockSupport {
    private static final String TOPIC = "test";
    private File tempFile;
    private Map<String, String> config;
    private OffsetStorageReader offsetStorageReader;
    private SourceTaskContext context;
    private FileStreamSourceTask task;
    private boolean verifyMocks = false;

    @Before
    public void setup() throws IOException {
        this.tempFile = File.createTempFile("file-stream-source-task-test", null);
        this.config = new HashMap<String, String>();
        this.config.put("file", this.tempFile.getAbsolutePath());
        this.config.put("topic", TOPIC);
        this.config.put("batch.size", String.valueOf(2000));
        this.task = new FileStreamSourceTask();
        this.offsetStorageReader = (OffsetStorageReader)this.createMock(OffsetStorageReader.class);
        this.context = (SourceTaskContext)this.createMock(SourceTaskContext.class);
        this.task.initialize(this.context);
    }

    @After
    public void teardown() {
        this.tempFile.delete();
        if (this.verifyMocks) {
            this.verifyAll();
        }
    }

    private void replay() {
        this.replayAll();
        this.verifyMocks = true;
    }

    @Test
    public void testNormalLifecycle() throws InterruptedException, IOException {
        this.expectOffsetLookupReturnNone();
        this.replay();
        this.task.start(this.config);
        OutputStream os = Files.newOutputStream(this.tempFile.toPath(), new OpenOption[0]);
        Assert.assertEquals(null, (Object)this.task.poll());
        os.write("partial line".getBytes());
        os.flush();
        Assert.assertEquals(null, (Object)this.task.poll());
        os.write(" finished\n".getBytes());
        os.flush();
        List records = this.task.poll();
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((Object)TOPIC, (Object)((SourceRecord)records.get(0)).topic());
        Assert.assertEquals((Object)"partial line finished", (Object)((SourceRecord)records.get(0)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(0)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 22L), (Object)((SourceRecord)records.get(0)).sourceOffset());
        Assert.assertEquals(null, (Object)this.task.poll());
        os.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
        os.flush();
        records = this.task.poll();
        Assert.assertEquals((long)4L, (long)records.size());
        Assert.assertEquals((Object)"line1", (Object)((SourceRecord)records.get(0)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(0)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 28L), (Object)((SourceRecord)records.get(0)).sourceOffset());
        Assert.assertEquals((Object)"line2", (Object)((SourceRecord)records.get(1)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(1)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 35L), (Object)((SourceRecord)records.get(1)).sourceOffset());
        Assert.assertEquals((Object)"line3", (Object)((SourceRecord)records.get(2)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(2)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 41L), (Object)((SourceRecord)records.get(2)).sourceOffset());
        Assert.assertEquals((Object)"line4", (Object)((SourceRecord)records.get(3)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(3)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 47L), (Object)((SourceRecord)records.get(3)).sourceOffset());
        os.write("subsequent text".getBytes());
        os.flush();
        records = this.task.poll();
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((Object)"", (Object)((SourceRecord)records.get(0)).value());
        Assert.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(0)).sourcePartition());
        Assert.assertEquals(Collections.singletonMap("position", 48L), (Object)((SourceRecord)records.get(0)).sourceOffset());
        os.close();
        this.task.stop();
    }

    @Test
    public void testBatchSize() throws IOException, InterruptedException {
        this.expectOffsetLookupReturnNone();
        this.replay();
        this.config.put("batch.size", "5000");
        this.task.start(this.config);
        OutputStream os = Files.newOutputStream(this.tempFile.toPath(), new OpenOption[0]);
        for (int i = 0; i < 10000; ++i) {
            os.write("Neque porro quisquam est qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit...\n".getBytes());
        }
        os.flush();
        List records = this.task.poll();
        Assert.assertEquals((long)5000L, (long)records.size());
        records = this.task.poll();
        Assert.assertEquals((long)5000L, (long)records.size());
        os.close();
        this.task.stop();
    }

    @Test
    public void testMissingFile() throws InterruptedException {
        this.replay();
        String data = "line\n";
        System.setIn(new ByteArrayInputStream(data.getBytes()));
        this.config.remove("file");
        this.task.start(this.config);
        List records = this.task.poll();
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((Object)TOPIC, (Object)((SourceRecord)records.get(0)).topic());
        Assert.assertEquals((Object)"line", (Object)((SourceRecord)records.get(0)).value());
        this.task.stop();
    }

    public void testInvalidFile() throws InterruptedException {
        this.config.put("file", "bogusfilename");
        this.task.start(this.config);
        for (int i = 0; i < 100; ++i) {
            Assert.assertEquals(null, (Object)this.task.poll());
        }
    }

    private void expectOffsetLookupReturnNone() {
        EasyMock.expect((Object)this.context.offsetStorageReader()).andReturn((Object)this.offsetStorageReader);
        EasyMock.expect((Object)this.offsetStorageReader.offset((Map)EasyMock.anyObject())).andReturn(null);
    }
}

