Commit 17b370d0 by olly Committed by Oliver Woodman

Allow upstream discards from the SampleQueue by time.

Add a SampleQueue method to discard from the write side of the queue by timestamp

PiperOrigin-RevId: 329303714
parent 440fd1cf
......@@ -2,7 +2,10 @@
### dev-v2 (not yet released)
* New release notes go here!
* Core library:
* Add `SampleQueue.discardUpstreamFrom` so upstream samples can be
discarded by timestamp.
* Add `SampleQueue.getLargestReadTimestampUs`.
### 2.12.0 (not yet released - targeted for 2020-09-03) ###
......
......@@ -215,6 +215,22 @@ public class SampleQueue implements TrackOutput {
sampleDataQueue.discardUpstreamSampleBytes(discardUpstreamSampleMetadata(discardFromIndex));
}
/**
* Discards samples from the write side of the queue.
*
* @param timeUs Samples will be discarded from the write end of the queue until a sample with a
* timestamp smaller than timeUs is encountered (this sample is not discarded). Must be larger
* than {@link #getLargestReadTimestampUs()}.
*/
public final void discardUpstreamFrom(long timeUs) {
if (length == 0) {
return;
}
checkArgument(timeUs > getLargestReadTimestampUs());
int retainCount = countUnreadSamplesBefore(timeUs);
discardUpstreamSamples(absoluteFirstIndex + retainCount);
}
// Called by the consuming thread.
/** Calls {@link #discardToEnd()} and releases any resources owned by the queue. */
......@@ -279,6 +295,16 @@ public class SampleQueue implements TrackOutput {
}
/**
* Returns the largest sample timestamp that has been read since the last {@link #reset}.
*
* @return The largest sample timestamp that has been read, or {@link Long#MIN_VALUE} if no
* samples have been read.
*/
public final synchronized long getLargestReadTimestampUs() {
return max(largestDiscardedTimestampUs, getLargestTimestamp(readPosition));
}
/**
* Returns whether the last sample of the stream has knowingly been queued. A return value of
* {@code false} means that the last sample had not been queued or that it's unknown whether the
* last sample has been queued.
......@@ -777,20 +803,10 @@ public class SampleQueue implements TrackOutput {
if (length == 0) {
return timeUs > largestDiscardedTimestampUs;
}
long largestReadTimestampUs =
max(largestDiscardedTimestampUs, getLargestTimestamp(readPosition));
if (largestReadTimestampUs >= timeUs) {
if (getLargestReadTimestampUs() >= timeUs) {
return false;
}
int retainCount = length;
int relativeSampleIndex = getRelativeIndex(length - 1);
while (retainCount > readPosition && timesUs[relativeSampleIndex] >= timeUs) {
retainCount--;
relativeSampleIndex--;
if (relativeSampleIndex == -1) {
relativeSampleIndex = capacity - 1;
}
}
int retainCount = countUnreadSamplesBefore(timeUs);
discardUpstreamSampleMetadata(absoluteFirstIndex + retainCount);
return true;
}
......@@ -888,6 +904,26 @@ public class SampleQueue implements TrackOutput {
}
/**
* Counts the number of samples that haven't been read that have a timestamp smaller than {@code
* timeUs}.
*
* @param timeUs The specified time.
* @return The number of unread samples with a timestamp smaller than {@code timeUs}.
*/
private int countUnreadSamplesBefore(long timeUs) {
int count = length;
int relativeSampleIndex = getRelativeIndex(length - 1);
while (count > readPosition && timesUs[relativeSampleIndex] >= timeUs) {
count--;
relativeSampleIndex--;
if (relativeSampleIndex == -1) {
relativeSampleIndex = capacity - 1;
}
}
return count;
}
/**
* Discards the specified number of samples.
*
* @param discardCount The number of samples to discard.
......
......@@ -879,6 +879,118 @@ public final class SampleQueueTest {
}
@Test
public void discardUpstreamFrom() {
writeTestData();
sampleQueue.discardUpstreamFrom(8000);
assertAllocationCount(10);
sampleQueue.discardUpstreamFrom(7000);
assertAllocationCount(9);
sampleQueue.discardUpstreamFrom(6000);
assertAllocationCount(7);
sampleQueue.discardUpstreamFrom(5000);
assertAllocationCount(5);
sampleQueue.discardUpstreamFrom(4000);
assertAllocationCount(4);
sampleQueue.discardUpstreamFrom(3000);
assertAllocationCount(3);
sampleQueue.discardUpstreamFrom(2000);
assertAllocationCount(2);
sampleQueue.discardUpstreamFrom(1000);
assertAllocationCount(1);
sampleQueue.discardUpstreamFrom(0);
assertAllocationCount(0);
assertReadFormat(false, FORMAT_2);
assertNoSamplesToRead(FORMAT_2);
}
@Test
public void discardUpstreamFromMulti() {
writeTestData();
sampleQueue.discardUpstreamFrom(4000);
assertAllocationCount(4);
sampleQueue.discardUpstreamFrom(0);
assertAllocationCount(0);
assertReadFormat(false, FORMAT_2);
assertNoSamplesToRead(FORMAT_2);
}
@Test
public void discardUpstreamFromNonSampleTimestamps() {
writeTestData();
sampleQueue.discardUpstreamFrom(3500);
assertAllocationCount(4);
sampleQueue.discardUpstreamFrom(500);
assertAllocationCount(1);
sampleQueue.discardUpstreamFrom(0);
assertAllocationCount(0);
assertReadFormat(false, FORMAT_2);
assertNoSamplesToRead(FORMAT_2);
}
@Test
public void discardUpstreamFromBeforeRead() {
writeTestData();
sampleQueue.discardUpstreamFrom(4000);
assertAllocationCount(4);
assertReadTestData(null, 0, 4);
assertReadFormat(false, FORMAT_2);
assertNoSamplesToRead(FORMAT_2);
}
@Test
public void discardUpstreamFromAfterRead() {
writeTestData();
assertReadTestData(null, 0, 3);
sampleQueue.discardUpstreamFrom(8000);
assertAllocationCount(10);
sampleQueue.discardToRead();
assertAllocationCount(7);
sampleQueue.discardUpstreamFrom(7000);
assertAllocationCount(6);
sampleQueue.discardUpstreamFrom(6000);
assertAllocationCount(4);
sampleQueue.discardUpstreamFrom(5000);
assertAllocationCount(2);
sampleQueue.discardUpstreamFrom(4000);
assertAllocationCount(1);
sampleQueue.discardUpstreamFrom(3000);
assertAllocationCount(0);
assertReadFormat(false, FORMAT_2);
assertNoSamplesToRead(FORMAT_2);
}
@Test
public void largestQueuedTimestampWithDiscardUpstreamFrom() {
writeTestData();
assertThat(sampleQueue.getLargestQueuedTimestampUs()).isEqualTo(LAST_SAMPLE_TIMESTAMP);
sampleQueue.discardUpstreamFrom(SAMPLE_TIMESTAMPS[SAMPLE_TIMESTAMPS.length - 1]);
// Discarding from upstream should reduce the largest timestamp.
assertThat(sampleQueue.getLargestQueuedTimestampUs())
.isEqualTo(SAMPLE_TIMESTAMPS[SAMPLE_TIMESTAMPS.length - 2]);
sampleQueue.discardUpstreamFrom(0);
// Discarding everything from upstream without reading should unset the largest timestamp.
assertThat(sampleQueue.getLargestQueuedTimestampUs()).isEqualTo(MIN_VALUE);
}
@Test
public void largestQueuedTimestampWithDiscardUpstreamFromDecodeOrder() {
long[] decodeOrderTimestamps = new long[] {0, 3000, 2000, 1000, 4000, 7000, 6000, 5000};
writeTestData(
DATA, SAMPLE_SIZES, SAMPLE_OFFSETS, decodeOrderTimestamps, SAMPLE_FORMATS, SAMPLE_FLAGS);
assertThat(sampleQueue.getLargestQueuedTimestampUs()).isEqualTo(7000);
sampleQueue.discardUpstreamFrom(SAMPLE_TIMESTAMPS[SAMPLE_TIMESTAMPS.length - 2]);
// Discarding the last two samples should not change the largest timestamp, due to the decode
// ordering of the timestamps.
assertThat(sampleQueue.getLargestQueuedTimestampUs()).isEqualTo(7000);
sampleQueue.discardUpstreamFrom(SAMPLE_TIMESTAMPS[SAMPLE_TIMESTAMPS.length - 3]);
// Once a third sample is discarded, the largest timestamp should have changed.
assertThat(sampleQueue.getLargestQueuedTimestampUs()).isEqualTo(4000);
sampleQueue.discardUpstreamFrom(0);
// Discarding everything from upstream without reading should unset the largest timestamp.
assertThat(sampleQueue.getLargestQueuedTimestampUs()).isEqualTo(MIN_VALUE);
}
@Test
public void discardUpstream() {
writeTestData();
sampleQueue.discardUpstreamSamples(8);
......@@ -987,6 +1099,43 @@ public final class SampleQueueTest {
}
@Test
public void largestReadTimestampWithReadAll() {
writeTestData();
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(MIN_VALUE);
assertReadTestData();
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(LAST_SAMPLE_TIMESTAMP);
}
@Test
public void largestReadTimestampWithReads() {
writeTestData();
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(MIN_VALUE);
assertReadTestData(/* startFormat= */ null, 0, 2);
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(SAMPLE_TIMESTAMPS[1]);
assertReadTestData(SAMPLE_FORMATS[1], 2, 3);
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(SAMPLE_TIMESTAMPS[4]);
}
@Test
public void largestReadTimestampWithDiscard() {
// Discarding shouldn't change the read timestamp.
writeTestData();
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(MIN_VALUE);
sampleQueue.discardUpstreamSamples(5);
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(MIN_VALUE);
assertReadTestData(/* startFormat= */ null, 0, 3);
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(SAMPLE_TIMESTAMPS[2]);
sampleQueue.discardUpstreamSamples(3);
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(SAMPLE_TIMESTAMPS[2]);
sampleQueue.discardToRead();
assertThat(sampleQueue.getLargestReadTimestampUs()).isEqualTo(SAMPLE_TIMESTAMPS[2]);
}
@Test
public void setSampleOffsetBeforeData() {
long sampleOffsetUs = 1000;
sampleQueue.setSampleOffsetUs(sampleOffsetUs);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment