Commit a0b15595 by andrewlewis Committed by Oliver Woodman

Cancel and interrupt downloads

Upstream components may incorrectly clear the interrupted flag and not
propagate an exception when the caching thread is interrupted due to
cancellation. Pass a settable flag down to CacheUtil.cache and check it
frequently so it's not necessary to rely on the interrupted flag.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=195371244
parent d5034ca8
......@@ -704,7 +704,7 @@ public final class DownloadManager {
}
});
} else if (changeStateAndNotify(STATE_STARTED, STATE_STARTED_CANCELING)) {
thread.interrupt();
cancelDownload();
}
}
......@@ -733,7 +733,14 @@ public final class DownloadManager {
return true;
}
/* Methods running on download thread. */
private void cancelDownload() {
if (downloader != null) {
downloader.cancel();
}
thread.interrupt();
}
// Methods running on download thread.
@Override
public void run() {
......@@ -746,7 +753,7 @@ public final class DownloadManager {
} else {
int errorCount = 0;
long errorPosition = C.LENGTH_UNSET;
while (true) {
while (!Thread.interrupted()) {
try {
downloader.download();
break;
......
......@@ -32,6 +32,9 @@ public interface Downloader {
*/
void download() throws InterruptedException, IOException;
/** Interrupts any current download operation and prevents future operations from running. */
void cancel();
/** Returns the total number of downloaded bytes. */
long getDownloadedBytes();
......
......@@ -24,6 +24,7 @@ import com.google.android.exoplayer2.upstream.cache.CacheUtil;
import com.google.android.exoplayer2.upstream.cache.CacheUtil.CachingCounters;
import com.google.android.exoplayer2.util.PriorityTaskManager;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A downloader for progressive media streams.
......@@ -37,6 +38,7 @@ public final class ProgressiveDownloader implements Downloader {
private final CacheDataSource dataSource;
private final PriorityTaskManager priorityTaskManager;
private final CacheUtil.CachingCounters cachingCounters;
private final AtomicBoolean isCanceled;
/**
* @param uri Uri of the data to be downloaded.
......@@ -51,6 +53,7 @@ public final class ProgressiveDownloader implements Downloader {
this.dataSource = constructorHelper.buildCacheDataSource(false);
this.priorityTaskManager = constructorHelper.getPriorityTaskManager();
cachingCounters = new CachingCounters();
isCanceled = new AtomicBoolean();
}
@Override
......@@ -65,6 +68,7 @@ public final class ProgressiveDownloader implements Downloader {
priorityTaskManager,
C.PRIORITY_DOWNLOAD,
cachingCounters,
isCanceled,
/* enableEOFException= */ true);
} finally {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
......@@ -72,6 +76,11 @@ public final class ProgressiveDownloader implements Downloader {
}
@Override
public void cancel() {
isCanceled.set(true);
}
@Override
public long getDownloadedBytes() {
return cachingCounters.totalCachedBytes();
}
......
......@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Base class for multi segment stream downloaders.
......@@ -68,6 +69,7 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M, K>, K>
private final CacheDataSource dataSource;
private final CacheDataSource offlineDataSource;
private final ArrayList<K> keys;
private final AtomicBoolean isCanceled;
private volatile int totalSegments;
private volatile int downloadedSegments;
......@@ -87,6 +89,7 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M, K>, K>
this.priorityTaskManager = constructorHelper.getPriorityTaskManager();
keys = new ArrayList<>(trackKeys);
totalSegments = C.LENGTH_UNSET;
isCanceled = new AtomicBoolean();
}
/**
......@@ -118,6 +121,7 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M, K>, K>
priorityTaskManager,
C.PRIORITY_DOWNLOAD,
cachingCounters,
isCanceled,
true);
downloadedSegments++;
} finally {
......@@ -130,6 +134,11 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M, K>, K>
}
@Override
public void cancel() {
isCanceled.set(true);
}
@Override
public final long getDownloadedBytes() {
return downloadedBytes;
}
......
......@@ -26,6 +26,7 @@ import com.google.android.exoplayer2.util.Util;
import java.io.EOFException;
import java.io.IOException;
import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Caching related utility methods.
......@@ -112,14 +113,27 @@ public final class CacheUtil {
* @param cache A {@link Cache} to store the data.
* @param upstream A {@link DataSource} for reading data not in the cache.
* @param counters If not null, updated during caching.
* @param isCanceled An optional flag that will interrupt caching if set to true.
* @throws IOException If an error occurs reading from the source.
* @throws InterruptedException If the thread was interrupted.
* @throws InterruptedException If the thread was interrupted directly or via {@code isCanceled}.
*/
public static void cache(
DataSpec dataSpec, Cache cache, DataSource upstream, @Nullable CachingCounters counters)
DataSpec dataSpec,
Cache cache,
DataSource upstream,
@Nullable CachingCounters counters,
@Nullable AtomicBoolean isCanceled)
throws IOException, InterruptedException {
cache(dataSpec, cache, new CacheDataSource(cache, upstream),
new byte[DEFAULT_BUFFER_SIZE_BYTES], null, 0, counters, false);
cache(
dataSpec,
cache,
new CacheDataSource(cache, upstream),
new byte[DEFAULT_BUFFER_SIZE_BYTES],
null,
0,
counters,
null,
false);
}
/**
......@@ -140,10 +154,11 @@ public final class CacheUtil {
* caching.
* @param priority The priority of this task. Used with {@code priorityTaskManager}.
* @param counters If not null, updated during caching.
* @param isCanceled An optional flag that will interrupt caching if set to true.
* @param enableEOFException Whether to throw an {@link EOFException} if end of input has been
* reached unexpectedly.
* @throws IOException If an error occurs reading from the source.
* @throws InterruptedException If the thread was interrupted.
* @throws InterruptedException If the thread was interrupted directly or via {@code isCanceled}.
*/
public static void cache(
DataSpec dataSpec,
......@@ -153,6 +168,7 @@ public final class CacheUtil {
PriorityTaskManager priorityTaskManager,
int priority,
@Nullable CachingCounters counters,
@Nullable AtomicBoolean isCanceled,
boolean enableEOFException)
throws IOException, InterruptedException {
Assertions.checkNotNull(dataSource);
......@@ -170,6 +186,9 @@ public final class CacheUtil {
long start = dataSpec.absoluteStreamPosition;
long left = dataSpec.length != C.LENGTH_UNSET ? dataSpec.length : cache.getContentLength(key);
while (left != 0) {
if (isCanceled != null && isCanceled.get()) {
throw new InterruptedException();
}
long blockLength =
cache.getCachedLength(key, start, left != C.LENGTH_UNSET ? left : Long.MAX_VALUE);
if (blockLength > 0) {
......
......@@ -608,6 +608,11 @@ public class DownloadManagerTest {
}
@Override
public void cancel() {
// Do nothing.
}
@Override
public void remove() throws InterruptedException {
assertThat(isRemoveAction).isTrue();
started.countDown();
......
......@@ -220,7 +220,7 @@ public final class CacheDataSourceTest {
.newDefaultData()
.appendReadData(1024 * 1024)
.endData());
CacheUtil.cache(dataSpec, cache, upstream2, null);
CacheUtil.cache(dataSpec, cache, upstream2, /* counters= */ null, /* isCanceled= */ null);
// Read the rest of the data.
TestUtil.readToEnd(cacheDataSource);
......@@ -271,7 +271,7 @@ public final class CacheDataSourceTest {
.newDefaultData()
.appendReadData(1024 * 1024)
.endData());
CacheUtil.cache(dataSpec, cache, upstream2, null);
CacheUtil.cache(dataSpec, cache, upstream2, /* counters= */ null, /* isCanceled= */ null);
// Read the rest of the data.
TestUtil.readToEnd(cacheDataSource);
......@@ -287,7 +287,7 @@ public final class CacheDataSourceTest {
// Cache the latter half of the data.
DataSpec dataSpec = new DataSpec(testDataUri, 512, C.LENGTH_UNSET, testDataKey);
CacheUtil.cache(dataSpec, cache, upstream, null);
CacheUtil.cache(dataSpec, cache, upstream, /* counters= */ null, /* isCanceled= */ null);
// Create cache read-only CacheDataSource.
CacheDataSource cacheDataSource =
......@@ -318,7 +318,7 @@ public final class CacheDataSourceTest {
// Cache the latter half of the data.
int halfDataLength = 512;
DataSpec dataSpec = new DataSpec(testDataUri, halfDataLength, C.LENGTH_UNSET, testDataKey);
CacheUtil.cache(dataSpec, cache, upstream, null);
CacheUtil.cache(dataSpec, cache, upstream, /* counters= */ null, /* isCanceled= */ null);
// Create blocking CacheDataSource.
CacheDataSource cacheDataSource =
......
......@@ -174,7 +174,8 @@ public final class CacheUtilTest {
FakeDataSource dataSource = new FakeDataSource(fakeDataSet);
CachingCounters counters = new CachingCounters();
CacheUtil.cache(new DataSpec(Uri.parse("test_data")), cache, dataSource, counters);
CacheUtil.cache(
new DataSpec(Uri.parse("test_data")), cache, dataSource, counters, /* isCanceled= */ null);
assertCounters(counters, 0, 100, 100);
assertCachedData(cache, fakeDataSet);
......@@ -188,11 +189,11 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, 10, 20, null);
CachingCounters counters = new CachingCounters();
CacheUtil.cache(dataSpec, cache, dataSource, counters);
CacheUtil.cache(dataSpec, cache, dataSource, counters, /* isCanceled= */ null);
assertCounters(counters, 0, 20, 20);
CacheUtil.cache(new DataSpec(testUri), cache, dataSource, counters);
CacheUtil.cache(new DataSpec(testUri), cache, dataSource, counters, /* isCanceled= */ null);
assertCounters(counters, 20, 80, 100);
assertCachedData(cache, fakeDataSet);
......@@ -207,7 +208,7 @@ public final class CacheUtilTest {
DataSpec dataSpec = new DataSpec(Uri.parse("test_data"));
CachingCounters counters = new CachingCounters();
CacheUtil.cache(dataSpec, cache, dataSource, counters);
CacheUtil.cache(dataSpec, cache, dataSource, counters, /* isCanceled= */ null);
assertCounters(counters, 0, 100, 100);
assertCachedData(cache, fakeDataSet);
......@@ -223,11 +224,11 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, 10, 20, null);
CachingCounters counters = new CachingCounters();
CacheUtil.cache(dataSpec, cache, dataSource, counters);
CacheUtil.cache(dataSpec, cache, dataSource, counters, /* isCanceled= */ null);
assertCounters(counters, 0, 20, 20);
CacheUtil.cache(new DataSpec(testUri), cache, dataSource, counters);
CacheUtil.cache(new DataSpec(testUri), cache, dataSource, counters, /* isCanceled= */ null);
assertCounters(counters, 20, 80, 100);
assertCachedData(cache, fakeDataSet);
......@@ -241,7 +242,7 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, 0, 1000, null);
CachingCounters counters = new CachingCounters();
CacheUtil.cache(dataSpec, cache, dataSource, counters);
CacheUtil.cache(dataSpec, cache, dataSource, counters, /* isCanceled= */ null);
assertCounters(counters, 0, 100, 1000);
assertCachedData(cache, fakeDataSet);
......@@ -256,9 +257,16 @@ public final class CacheUtilTest {
DataSpec dataSpec = new DataSpec(testUri, 0, 1000, null);
try {
CacheUtil.cache(dataSpec, cache, new CacheDataSource(cache, dataSource),
new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES], null, 0, null,
/*enableEOFException*/ true);
CacheUtil.cache(
dataSpec,
cache,
new CacheDataSource(cache, dataSource),
new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES],
/* priorityTaskManager= */ null,
/* priority= */ 0,
/* counters= */ null,
/* isCanceled= */ null,
/* enableEOFException= */ true);
fail();
} catch (EOFException e) {
// Do nothing.
......@@ -286,7 +294,8 @@ public final class CacheUtilTest {
.appendReadData(TestUtil.buildTestData(100)).endData();
FakeDataSource dataSource = new FakeDataSource(fakeDataSet);
CacheUtil.cache(new DataSpec(Uri.parse("test_data")), cache, dataSource, counters);
CacheUtil.cache(
new DataSpec(Uri.parse("test_data")), cache, dataSource, counters, /* isCanceled= */ null);
assertCounters(counters, 0, 300, 300);
assertCachedData(cache, fakeDataSet);
......@@ -298,10 +307,17 @@ public final class CacheUtilTest {
FakeDataSource dataSource = new FakeDataSource(fakeDataSet);
Uri uri = Uri.parse("test_data");
CacheUtil.cache(new DataSpec(uri), cache,
CacheUtil.cache(
new DataSpec(uri),
cache,
// set maxCacheFileSize to 10 to make sure there are multiple spans
new CacheDataSource(cache, dataSource, 0, 10),
new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES], null, 0, null, true);
new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES],
/* priorityTaskManager= */ null,
/* priority= */ 0,
/* counters= */ null,
/* isCanceled= */ null,
true);
CacheUtil.remove(cache, CacheUtil.generateKey(uri));
assertCacheEmpty(cache);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment