package org.apache.flink.streaming.api.invokable.operator.co;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;

/* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.class */
public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokable<IN1, IN2, OUT> {
    private static final long serialVersionUID = 1;
    protected long startTime1;
    protected long startTime2;
    protected long nextRecordTime1;
    protected long nextRecordTime2;
    protected TimeStamp<IN1> timestamp1;
    protected TimeStamp<IN2> timestamp2;
    protected CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN1> window1;
    protected CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN2> window2;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable$StreamWindow.class */
    public class StreamWindow<IN> extends CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN> {
        private static final long serialVersionUID = 1;

        public StreamWindow(long j, long j2) {
            super(j, j2);
        }

        @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable.StreamBatch
        public boolean batchEnd() {
            if (this.minibatchCounter != this.numberOfBatches) {
                return false;
            }
            this.minibatchCounter -= this.batchPerSlide;
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable$TimeCheck1.class */
    private class TimeCheck1 extends Thread {
        private TimeCheck1() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Thread.sleep(CoWindowReduceInvokable.this.slideSize1);
                } catch (InterruptedException e) {
                }
                if (!CoWindowReduceInvokable.this.isRunning) {
                    return;
                } else {
                    CoWindowReduceInvokable.this.checkWindowEnd1(System.currentTimeMillis(), CoWindowReduceInvokable.this.window1);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable$TimeCheck2.class */
    private class TimeCheck2 extends Thread {
        private TimeCheck2() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Thread.sleep(CoWindowReduceInvokable.this.slideSize2);
                } catch (InterruptedException e) {
                }
                if (!CoWindowReduceInvokable.this.isRunning) {
                    return;
                } else {
                    CoWindowReduceInvokable.this.checkWindowEnd2(System.currentTimeMillis(), CoWindowReduceInvokable.this.window2);
                }
            }
        }
    }

    public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReduceFunction, long j, long j2, long j3, long j4, TimeStamp<IN1> timeStamp, TimeStamp<IN2> timeStamp2) {
        super(coReduceFunction, j, j2, j3, j4);
        this.timestamp1 = timeStamp;
        this.timestamp2 = timeStamp2;
        this.startTime1 = timeStamp.getStartTime();
        this.startTime2 = timeStamp2.getStartTime();
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable, org.apache.flink.streaming.api.invokable.StreamInvokable
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.window1 = new StreamWindow<>(this.batchSize1, this.slideSize1);
        this.window2 = new StreamWindow<>(this.batchSize2, this.slideSize2);
        this.batch1 = this.window1;
        this.batch2 = this.window2;
        if (this.timestamp1 instanceof DefaultTimeStamp) {
            new TimeCheck1().start();
        }
        if (this.timestamp2 instanceof DefaultTimeStamp) {
            new TimeCheck2().start();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v6, types: [IN, java.lang.Object] */
    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    public void reduceToBuffer1(IN1 in1, CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> streamBatch) throws Exception {
        checkWindowEnd1(this.timestamp1.getTimestamp(in1), (StreamWindow) streamBatch);
        if (streamBatch.currentValue != null) {
            streamBatch.currentValue = this.coReducer.reduce1(this.serializer1.copy(streamBatch.currentValue), this.serializer1.copy(in1));
        } else {
            streamBatch.currentValue = in1;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v6, types: [IN, java.lang.Object] */
    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    public void reduceToBuffer2(IN2 in2, CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> streamBatch) throws Exception {
        checkWindowEnd2(this.timestamp2.getTimestamp(in2), (StreamWindow) streamBatch);
        if (streamBatch.currentValue != null) {
            streamBatch.currentValue = this.coReducer.reduce2(this.serializer2.copy(streamBatch.currentValue), this.serializer2.copy(in2));
        } else {
            streamBatch.currentValue = in2;
        }
    }

    protected synchronized void checkWindowEnd1(long j, CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN1> streamWindow) {
        this.nextRecordTime1 = j;
        while (miniBatchEnd1()) {
            streamWindow.addToBuffer();
            if (streamWindow.batchEnd()) {
                reduceBatch1(streamWindow);
            }
        }
    }

    protected synchronized void checkWindowEnd2(long j, CoWindowReduceInvokable<IN1, IN2, OUT>.StreamWindow<IN2> streamWindow) {
        this.nextRecordTime2 = j;
        while (miniBatchEnd2()) {
            streamWindow.addToBuffer();
            if (streamWindow.batchEnd()) {
                reduceBatch2(streamWindow);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean miniBatchEnd1() {
        if (this.nextRecordTime1 < this.startTime1 + this.granularity1) {
            return false;
        }
        this.startTime1 += this.granularity1;
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean miniBatchEnd2() {
        if (this.nextRecordTime2 < this.startTime2 + this.granularity2) {
            return false;
        }
        this.startTime2 += this.granularity2;
        return true;
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    public void reduceBatch1(CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN1> streamBatch) {
        reduce1(streamBatch);
    }

    @Override // org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable
    public void reduceBatch2(CoBatchReduceInvokable<IN1, IN2, OUT>.StreamBatch<IN2> streamBatch) {
        reduce2(streamBatch);
    }
}
