Commit d88f5f47 by olly Committed by Oliver Woodman

Turn CacheUtil into stateful CacheWriter

- The new CacheWriter is simplified somewhat
- Blocking on PriorityTaskManager.proceed is moved out of
  CacheWriter and into the Downloader tasks. This is because
  we want to shift only the caching parts of the Downloaders
  onto their Executors, whilst keeping the blocking parts on
  the main Downloader threads. Else we can end up "using"
  the Executor threads indefinitely whilst they're blocked.

Issue: #5978
PiperOrigin-RevId: 313222923
parent a1c72c0d
...@@ -21,9 +21,11 @@ import com.google.android.exoplayer2.C; ...@@ -21,9 +21,11 @@ import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.MediaItem; import com.google.android.exoplayer2.MediaItem;
import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.upstream.cache.CacheDataSource; import com.google.android.exoplayer2.upstream.cache.CacheDataSource;
import com.google.android.exoplayer2.upstream.cache.CacheUtil; import com.google.android.exoplayer2.upstream.cache.CacheWriter;
import com.google.android.exoplayer2.upstream.cache.CacheWriter.ProgressListener;
import com.google.android.exoplayer2.util.Assertions; import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.PriorityTaskManager; import com.google.android.exoplayer2.util.PriorityTaskManager;
import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -31,8 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; ...@@ -31,8 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
/** A downloader for progressive media streams. */ /** A downloader for progressive media streams. */
public final class ProgressiveDownloader implements Downloader { public final class ProgressiveDownloader implements Downloader {
private static final int BUFFER_SIZE_BYTES = 128 * 1024;
private final DataSpec dataSpec; private final DataSpec dataSpec;
private final CacheDataSource dataSource; private final CacheDataSource dataSource;
private final AtomicBoolean isCanceled; private final AtomicBoolean isCanceled;
...@@ -104,18 +104,35 @@ public final class ProgressiveDownloader implements Downloader { ...@@ -104,18 +104,35 @@ public final class ProgressiveDownloader implements Downloader {
if (isCanceled.get()) { if (isCanceled.get()) {
return; return;
} }
CacheWriter cacheWriter =
new CacheWriter(
dataSource,
dataSpec,
/* allowShortContent= */ false,
isCanceled,
/* temporaryBuffer= */ null,
progressListener == null ? null : new ProgressForwarder(progressListener));
@Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager(); @Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager();
if (priorityTaskManager != null) { if (priorityTaskManager != null) {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD); priorityTaskManager.add(C.PRIORITY_DOWNLOAD);
} }
try { try {
CacheUtil.cache( boolean finished = false;
dataSource, while (!finished && !isCanceled.get()) {
dataSpec, if (priorityTaskManager != null) {
progressListener == null ? null : new ProgressForwarder(progressListener), priorityTaskManager.proceed(dataSource.getUpstreamPriority());
isCanceled, }
/* enableEOFException= */ true, try {
/* temporaryBuffer= */ new byte[BUFFER_SIZE_BYTES]); cacheWriter.cache();
finished = true;
} catch (PriorityTooLowException e) {
// The next loop iteration will block until the task is able to proceed.
}
}
} catch (InterruptedException e) {
// The download was canceled.
} finally { } finally {
if (priorityTaskManager != null) { if (priorityTaskManager != null) {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD); priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
...@@ -137,7 +154,7 @@ public final class ProgressiveDownloader implements Downloader { ...@@ -137,7 +154,7 @@ public final class ProgressiveDownloader implements Downloader {
dataSource.getCache().removeResource(dataSource.getCacheKeyFactory().buildCacheKey(dataSpec)); dataSource.getCache().removeResource(dataSource.getCacheKeyFactory().buildCacheKey(dataSpec));
} }
private static final class ProgressForwarder implements CacheUtil.ProgressListener { private static final class ProgressForwarder implements CacheWriter.ProgressListener {
private final ProgressListener progressListener; private final ProgressListener progressListener;
......
...@@ -28,10 +28,11 @@ import com.google.android.exoplayer2.upstream.ParsingLoadable.Parser; ...@@ -28,10 +28,11 @@ import com.google.android.exoplayer2.upstream.ParsingLoadable.Parser;
import com.google.android.exoplayer2.upstream.cache.Cache; import com.google.android.exoplayer2.upstream.cache.Cache;
import com.google.android.exoplayer2.upstream.cache.CacheDataSource; import com.google.android.exoplayer2.upstream.cache.CacheDataSource;
import com.google.android.exoplayer2.upstream.cache.CacheKeyFactory; import com.google.android.exoplayer2.upstream.cache.CacheKeyFactory;
import com.google.android.exoplayer2.upstream.cache.CacheUtil; import com.google.android.exoplayer2.upstream.cache.CacheWriter;
import com.google.android.exoplayer2.upstream.cache.ContentMetadata; import com.google.android.exoplayer2.upstream.cache.ContentMetadata;
import com.google.android.exoplayer2.util.Assertions; import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.PriorityTaskManager; import com.google.android.exoplayer2.util.PriorityTaskManager;
import com.google.android.exoplayer2.util.PriorityTaskManager.PriorityTooLowException;
import com.google.android.exoplayer2.util.Util; import com.google.android.exoplayer2.util.Util;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -175,18 +176,32 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -175,18 +176,32 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
segmentsDownloaded) segmentsDownloaded)
: null; : null;
byte[] temporaryBuffer = new byte[BUFFER_SIZE_BYTES]; byte[] temporaryBuffer = new byte[BUFFER_SIZE_BYTES];
for (int i = 0; i < segments.size(); i++) { int segmentIndex = 0;
CacheUtil.cache( while (!isCanceled.get() && segmentIndex < segments.size()) {
if (priorityTaskManager != null) {
priorityTaskManager.proceed(dataSource.getUpstreamPriority());
}
CacheWriter cacheWriter =
new CacheWriter(
dataSource, dataSource,
segments.get(i).dataSpec, segments.get(segmentIndex).dataSpec,
progressNotifier, /* allowShortContent= */ false,
isCanceled, isCanceled,
/* enableEOFException= */ true, temporaryBuffer,
temporaryBuffer); progressNotifier);
try {
cacheWriter.cache();
segmentIndex++;
if (progressNotifier != null) { if (progressNotifier != null) {
progressNotifier.onSegmentDownloaded(); progressNotifier.onSegmentDownloaded();
} }
} catch (PriorityTooLowException e) {
// The next loop iteration will block until the task is able to proceed, then try and
// download the same segment again.
}
} }
} catch (InterruptedException e) {
// The download was canceled.
} finally { } finally {
if (priorityTaskManager != null) { if (priorityTaskManager != null) {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD); priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
...@@ -293,7 +308,7 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -293,7 +308,7 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
&& dataSpec1.httpRequestHeaders.equals(dataSpec2.httpRequestHeaders); && dataSpec1.httpRequestHeaders.equals(dataSpec2.httpRequestHeaders);
} }
private static final class ProgressNotifier implements CacheUtil.ProgressListener { private static final class ProgressNotifier implements CacheWriter.ProgressListener {
private final ProgressListener progressListener; private final ProgressListener progressListener;
......
...@@ -357,8 +357,15 @@ public final class CacheDataSourceTest { ...@@ -357,8 +357,15 @@ public final class CacheDataSourceTest {
.newDefaultData() .newDefaultData()
.appendReadData(1024 * 1024) .appendReadData(1024 * 1024)
.endData()); .endData());
CacheUtil.cache( CacheWriter cacheWriter =
cache, unboundedDataSpec, upstream2, /* progressListener= */ null, /* isCanceled= */ null); new CacheWriter(
new CacheDataSource(cache, upstream2),
unboundedDataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
// Read the rest of the data. // Read the rest of the data.
TestUtil.readToEnd(cacheDataSource); TestUtil.readToEnd(cacheDataSource);
...@@ -401,8 +408,15 @@ public final class CacheDataSourceTest { ...@@ -401,8 +408,15 @@ public final class CacheDataSourceTest {
.newDefaultData() .newDefaultData()
.appendReadData(1024 * 1024) .appendReadData(1024 * 1024)
.endData()); .endData());
CacheUtil.cache( CacheWriter cacheWriter =
cache, unboundedDataSpec, upstream2, /* progressListener= */ null, /* isCanceled= */ null); new CacheWriter(
new CacheDataSource(cache, upstream2),
unboundedDataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
// Read the rest of the data. // Read the rest of the data.
TestUtil.readToEnd(cacheDataSource); TestUtil.readToEnd(cacheDataSource);
...@@ -420,8 +434,15 @@ public final class CacheDataSourceTest { ...@@ -420,8 +434,15 @@ public final class CacheDataSourceTest {
// Cache the latter half of the data. // Cache the latter half of the data.
int halfDataLength = 512; int halfDataLength = 512;
DataSpec dataSpec = buildDataSpec(halfDataLength, C.LENGTH_UNSET); DataSpec dataSpec = buildDataSpec(halfDataLength, C.LENGTH_UNSET);
CacheUtil.cache( CacheWriter cacheWriter =
cache, dataSpec, upstream, /* progressListener= */ null, /* isCanceled= */ null); new CacheWriter(
new CacheDataSource(cache, upstream),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
// Create cache read-only CacheDataSource. // Create cache read-only CacheDataSource.
CacheDataSource cacheDataSource = CacheDataSource cacheDataSource =
...@@ -451,8 +472,15 @@ public final class CacheDataSourceTest { ...@@ -451,8 +472,15 @@ public final class CacheDataSourceTest {
// Cache the latter half of the data. // Cache the latter half of the data.
int halfDataLength = 512; int halfDataLength = 512;
DataSpec dataSpec = buildDataSpec(/* position= */ 0, halfDataLength); DataSpec dataSpec = buildDataSpec(/* position= */ 0, halfDataLength);
CacheUtil.cache( CacheWriter cacheWriter =
cache, dataSpec, upstream, /* progressListener= */ null, /* isCanceled= */ null); new CacheWriter(
new CacheDataSource(cache, upstream),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
/* progressListener= */ null);
cacheWriter.cache();
// Create blocking CacheDataSource. // Create blocking CacheDataSource.
CacheDataSource cacheDataSource = CacheDataSource cacheDataSource =
......
...@@ -26,10 +26,11 @@ import com.google.android.exoplayer2.C; ...@@ -26,10 +26,11 @@ import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.testutil.FakeDataSet; import com.google.android.exoplayer2.testutil.FakeDataSet;
import com.google.android.exoplayer2.testutil.FakeDataSource; import com.google.android.exoplayer2.testutil.FakeDataSource;
import com.google.android.exoplayer2.testutil.TestUtil; import com.google.android.exoplayer2.testutil.TestUtil;
import com.google.android.exoplayer2.upstream.DataSourceException;
import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.util.Util; import com.google.android.exoplayer2.util.Util;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.IOException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
...@@ -38,9 +39,9 @@ import org.mockito.Answers; ...@@ -38,9 +39,9 @@ import org.mockito.Answers;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
/** Tests {@link CacheUtil}. */ /** Unit tests for {@link CacheWriter}. */
@RunWith(AndroidJUnit4.class) @RunWith(AndroidJUnit4.class)
public final class CacheUtilTest { public final class CacheWriterTest {
/** /**
* Abstract fake Cache implementation used by the test. This class must be public so Mockito can * Abstract fake Cache implementation used by the test. This class must be public so Mockito can
...@@ -109,8 +110,16 @@ public final class CacheUtilTest { ...@@ -109,8 +110,16 @@ public final class CacheUtilTest {
FakeDataSource dataSource = new FakeDataSource(fakeDataSet); FakeDataSource dataSource = new FakeDataSource(fakeDataSet);
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(
cache, new DataSpec(Uri.parse("test_data")), dataSource, counters, /* isCanceled= */ null); CacheWriter cacheWriter =
new CacheWriter(
new CacheDataSource(cache, dataSource),
new DataSpec(Uri.parse("test_data")),
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
counters.assertValues(0, 100, 100); counters.assertValues(0, 100, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -124,12 +133,29 @@ public final class CacheUtilTest { ...@@ -124,12 +133,29 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data"); Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20);
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null);
CacheWriter cacheWriter =
new CacheWriter(
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
counters.assertValues(0, 20, 20); counters.assertValues(0, 20, 20);
counters.reset(); counters.reset();
CacheUtil.cache(cache, new DataSpec(testUri), dataSource, counters, /* isCanceled= */ null); cacheWriter =
new CacheWriter(
new CacheDataSource(cache, dataSource),
new DataSpec(testUri),
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
counters.assertValues(20, 80, 100); counters.assertValues(20, 80, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -144,7 +170,16 @@ public final class CacheUtilTest { ...@@ -144,7 +170,16 @@ public final class CacheUtilTest {
DataSpec dataSpec = new DataSpec(Uri.parse("test_data")); DataSpec dataSpec = new DataSpec(Uri.parse("test_data"));
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null);
CacheWriter cacheWriter =
new CacheWriter(
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
counters.assertValues(0, 100, 100); counters.assertValues(0, 100, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -160,12 +195,29 @@ public final class CacheUtilTest { ...@@ -160,12 +195,29 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data"); Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20);
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null);
CacheWriter cacheWriter =
new CacheWriter(
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
counters.assertValues(0, 20, 20); counters.assertValues(0, 20, 20);
counters.reset(); counters.reset();
CacheUtil.cache(cache, new DataSpec(testUri), dataSource, counters, /* isCanceled= */ null); cacheWriter =
new CacheWriter(
new CacheDataSource(cache, dataSource),
new DataSpec(testUri),
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
counters.assertValues(20, 80, 100); counters.assertValues(20, 80, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -179,9 +231,18 @@ public final class CacheUtilTest { ...@@ -179,9 +231,18 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data"); Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, /* position= */ 0, /* length= */ 1000); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 0, /* length= */ 1000);
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null);
counters.assertValues(0, 100, 1000); CacheWriter cacheWriter =
new CacheWriter(
new CacheDataSource(cache, dataSource),
dataSpec,
/* allowShortContent= */ true,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
counters.assertValues(0, 100, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
} }
...@@ -194,16 +255,18 @@ public final class CacheUtilTest { ...@@ -194,16 +255,18 @@ public final class CacheUtilTest {
DataSpec dataSpec = new DataSpec(testUri, /* position= */ 0, /* length= */ 1000); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 0, /* length= */ 1000);
try { try {
CacheUtil.cache( CacheWriter cacheWriter =
new CacheWriter(
new CacheDataSource(cache, dataSource), new CacheDataSource(cache, dataSource),
dataSpec, dataSpec,
/* progressListener= */ null, /* allowShortContent= */ false,
/* isCanceled= */ null, /* isCanceled= */ null,
/* enableEOFException= */ true, /* temporaryBuffer= */ null,
/* temporaryBuffer= */ new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES]); /* progressListener= */ null);
cacheWriter.cache();
fail(); fail();
} catch (EOFException e) { } catch (IOException e) {
// Do nothing. assertThat(DataSourceException.isCausedByPositionOutOfRange(e)).isTrue();
} }
} }
...@@ -221,14 +284,21 @@ public final class CacheUtilTest { ...@@ -221,14 +284,21 @@ public final class CacheUtilTest {
.endData(); .endData();
FakeDataSource dataSource = new FakeDataSource(fakeDataSet); FakeDataSource dataSource = new FakeDataSource(fakeDataSet);
CacheUtil.cache( CacheWriter cacheWriter =
cache, new DataSpec(Uri.parse("test_data")), dataSource, counters, /* isCanceled= */ null); new CacheWriter(
new CacheDataSource(cache, dataSource),
new DataSpec(Uri.parse("test_data")),
/* allowShortContent= */ false,
/* isCanceled= */ null,
/* temporaryBuffer= */ null,
counters);
cacheWriter.cache();
counters.assertValues(0, 300, 300); counters.assertValues(0, 300, 300);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
} }
private static final class CachingCounters implements CacheUtil.ProgressListener { private static final class CachingCounters implements CacheWriter.ProgressListener {
private long contentLength = C.LENGTH_UNSET; private long contentLength = C.LENGTH_UNSET;
private long bytesAlreadyCached; private long bytesAlreadyCached;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment