Commit 32516d85 by olly Committed by Oliver Woodman

Simplify thread interruption for downloads.

- Stop throwing InterruptedException from CacheUtil. When a CacheUtil operation
  throws or returns, the caller should always check its own state to determine
  whether they canceled the operation. If a caller is trying to catch
  InterruptedException separately to IOException to do something different in
  that case, they're probably doing the wrong thing. So it's simpler, and probably
  less error prone, just to throw an IOException in the case of interruption.
- Throwing InterruptedIOException is also consistent with what our Extractor and
  DataSource implementations do.

Issue: #5978
PiperOrigin-RevId: 309411556
parent f9466066
...@@ -1019,7 +1019,7 @@ public final class DownloadManager { ...@@ -1019,7 +1019,7 @@ public final class DownloadManager {
// Cancel the downloading task. // Cancel the downloading task.
activeTask.cancel(/* released= */ false); activeTask.cancel(/* released= */ false);
} }
// The activeTask is either a remove task, or a downloading task that we just cancelled. In // The activeTask is either a remove task, or a downloading task that we just canceled. In
// the latter case we need to wait for the task to stop before we start a remove task. // the latter case we need to wait for the task to stop before we start a remove task.
return; return;
} }
...@@ -1284,6 +1284,7 @@ public final class DownloadManager { ...@@ -1284,6 +1284,7 @@ public final class DownloadManager {
if (!isCanceled) { if (!isCanceled) {
isCanceled = true; isCanceled = true;
downloader.cancel(); downloader.cancel();
// TODO - This will need propagating deeper as soon as we start using additional threads.
interrupt(); interrupt();
} }
} }
......
...@@ -46,19 +46,13 @@ public interface Downloader { ...@@ -46,19 +46,13 @@ public interface Downloader {
* *
* @param progressListener A listener to receive progress updates, or {@code null}. * @param progressListener A listener to receive progress updates, or {@code null}.
* @throws DownloadException Thrown if the content cannot be downloaded. * @throws DownloadException Thrown if the content cannot be downloaded.
* @throws InterruptedException If the thread has been interrupted. * @throws IOException If the download did not complete successfully.
* @throws IOException Thrown when there is an io error while downloading.
*/ */
void download(@Nullable ProgressListener progressListener) void download(@Nullable ProgressListener progressListener) throws IOException;
throws InterruptedException, IOException;
/** Cancels the download operation and prevents future download operations from running. */ /** Cancels the download operation and prevents future download operations from running. */
void cancel(); void cancel();
/** /** Removes the content. */
* Removes the content. void remove();
*
* @throws InterruptedException Thrown if the thread was interrupted.
*/
void remove() throws InterruptedException;
} }
...@@ -73,8 +73,7 @@ public final class ProgressiveDownloader implements Downloader { ...@@ -73,8 +73,7 @@ public final class ProgressiveDownloader implements Downloader {
} }
@Override @Override
public void download(@Nullable ProgressListener progressListener) public void download(@Nullable ProgressListener progressListener) throws IOException {
throws InterruptedException, IOException {
@Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager(); @Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager();
if (priorityTaskManager != null) { if (priorityTaskManager != null) {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD); priorityTaskManager.add(C.PRIORITY_DOWNLOAD);
......
...@@ -101,16 +101,8 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -101,16 +101,8 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
isCanceled = new AtomicBoolean(); isCanceled = new AtomicBoolean();
} }
/**
* Downloads the selected streams in the media. If multiple streams are selected, they are
* downloaded in sync with one another.
*
* @throws IOException Thrown when there is an error downloading.
* @throws InterruptedException If the thread has been interrupted.
*/
@Override @Override
public final void download(@Nullable ProgressListener progressListener) public final void download(@Nullable ProgressListener progressListener) throws IOException {
throws IOException, InterruptedException {
@Nullable @Nullable
PriorityTaskManager priorityTaskManager = PriorityTaskManager priorityTaskManager =
cacheDataSourceFactory.getUpstreamPriorityTaskManager(); cacheDataSourceFactory.getUpstreamPriorityTaskManager();
...@@ -194,7 +186,7 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -194,7 +186,7 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
} }
@Override @Override
public final void remove() throws InterruptedException { public final void remove() {
Cache cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache()); Cache cache = Assertions.checkNotNull(cacheDataSourceFactory.getCache());
CacheKeyFactory cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory(); CacheKeyFactory cacheKeyFactory = cacheDataSourceFactory.getCacheKeyFactory();
CacheDataSource dataSource = cacheDataSourceFactory.createDataSourceForRemovingDownload(); CacheDataSource dataSource = cacheDataSourceFactory.createDataSourceForRemovingDownload();
...@@ -235,13 +227,11 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -235,13 +227,11 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
* segments from being listed. If true then a partial segment list will be returned. If false * segments from being listed. If true then a partial segment list will be returned. If false
* an {@link IOException} will be thrown. * an {@link IOException} will be thrown.
* @return The list of downloadable {@link Segment}s. * @return The list of downloadable {@link Segment}s.
* @throws InterruptedException Thrown if the thread was interrupted.
* @throws IOException Thrown if {@code allowPartialIndex} is false and a load error occurs, or if * @throws IOException Thrown if {@code allowPartialIndex} is false and a load error occurs, or if
* the media is not in a form that allows for its segments to be listed. * the media is not in a form that allows for its segments to be listed.
*/ */
protected abstract List<Segment> getSegments( protected abstract List<Segment> getSegments(
DataSource dataSource, M manifest, boolean allowIncompleteList) DataSource dataSource, M manifest, boolean allowIncompleteList) throws IOException;
throws InterruptedException, IOException;
protected static DataSpec getCompressibleDataSpec(Uri uri) { protected static DataSpec getCompressibleDataSpec(Uri uri) {
return new DataSpec.Builder().setUri(uri).setFlags(DataSpec.FLAG_ALLOW_GZIP).build(); return new DataSpec.Builder().setUri(uri).setFlags(DataSpec.FLAG_ALLOW_GZIP).build();
......
...@@ -28,6 +28,7 @@ import com.google.android.exoplayer2.util.PriorityTaskManager; ...@@ -28,6 +28,7 @@ import com.google.android.exoplayer2.util.PriorityTaskManager;
import com.google.android.exoplayer2.util.Util; import com.google.android.exoplayer2.util.Util;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -105,15 +106,17 @@ public final class CacheUtil { ...@@ -105,15 +106,17 @@ public final class CacheUtil {
* Caches the data defined by {@code dataSpec}, skipping already cached data. Caching stops early * Caches the data defined by {@code dataSpec}, skipping already cached data. Caching stops early
* if the end of the input is reached. * if the end of the input is reached.
* *
* <p>To cancel the operation, the caller should both set {@code isCanceled} to true and interrupt
* the calling thread.
*
* <p>This method may be slow and shouldn't normally be called on the main thread. * <p>This method may be slow and shouldn't normally be called on the main thread.
* *
* @param cache A {@link Cache} to store the data. * @param cache A {@link Cache} to store the data.
* @param dataSpec Defines the data to be cached. * @param dataSpec Defines the data to be cached.
* @param upstreamDataSource A {@link DataSource} for reading data not in the cache. * @param upstreamDataSource A {@link DataSource} for reading data not in the cache.
* @param progressListener A listener to receive progress updates, or {@code null}. * @param progressListener A listener to receive progress updates, or {@code null}.
* @param isCanceled An optional flag that will interrupt caching if set to true. * @param isCanceled An optional flag that will cancel the operation if set to true.
* @throws IOException If an error occurs reading from the source. * @throws IOException If an error occurs caching the data, or if the operation was canceled.
* @throws InterruptedException If the thread was interrupted directly or via {@code isCanceled}.
*/ */
@WorkerThread @WorkerThread
public static void cache( public static void cache(
...@@ -122,7 +125,7 @@ public final class CacheUtil { ...@@ -122,7 +125,7 @@ public final class CacheUtil {
DataSource upstreamDataSource, DataSource upstreamDataSource,
@Nullable ProgressListener progressListener, @Nullable ProgressListener progressListener,
@Nullable AtomicBoolean isCanceled) @Nullable AtomicBoolean isCanceled)
throws IOException, InterruptedException { throws IOException {
cache( cache(
new CacheDataSource(cache, upstreamDataSource), new CacheDataSource(cache, upstreamDataSource),
dataSpec, dataSpec,
...@@ -140,17 +143,19 @@ public final class CacheUtil { ...@@ -140,17 +143,19 @@ public final class CacheUtil {
* calling code to call {@link PriorityTaskManager#add} to register with the manager before * calling code to call {@link PriorityTaskManager#add} to register with the manager before
* calling this method, and to call {@link PriorityTaskManager#remove} afterwards to unregister. * calling this method, and to call {@link PriorityTaskManager#remove} afterwards to unregister.
* *
* <p>To cancel the operation, the caller should both set {@code isCanceled} to true and interrupt
* the calling thread.
*
* <p>This method may be slow and shouldn't normally be called on the main thread. * <p>This method may be slow and shouldn't normally be called on the main thread.
* *
* @param dataSource A {@link CacheDataSource} to be used for caching the data. * @param dataSource A {@link CacheDataSource} to be used for caching the data.
* @param dataSpec Defines the data to be cached. * @param dataSpec Defines the data to be cached.
* @param progressListener A listener to receive progress updates, or {@code null}. * @param progressListener A listener to receive progress updates, or {@code null}.
* @param isCanceled An optional flag that will interrupt caching if set to true. * @param isCanceled An optional flag that will cancel the operation if set to true.
* @param enableEOFException Whether to throw an {@link EOFException} if end of input has been * @param enableEOFException Whether to throw an {@link EOFException} if end of input has been
* reached unexpectedly. * reached unexpectedly.
* @param temporaryBuffer A temporary buffer to be used during caching. * @param temporaryBuffer A temporary buffer to be used during caching.
* @throws IOException If an error occurs reading from the source. * @throws IOException If an error occurs caching the data, or if the operation was canceled.
* @throws InterruptedException If the thread was interrupted directly or via {@code isCanceled}.
*/ */
@WorkerThread @WorkerThread
public static void cache( public static void cache(
...@@ -160,7 +165,7 @@ public final class CacheUtil { ...@@ -160,7 +165,7 @@ public final class CacheUtil {
@Nullable AtomicBoolean isCanceled, @Nullable AtomicBoolean isCanceled,
boolean enableEOFException, boolean enableEOFException,
byte[] temporaryBuffer) byte[] temporaryBuffer)
throws IOException, InterruptedException { throws IOException {
Assertions.checkNotNull(dataSource); Assertions.checkNotNull(dataSource);
Assertions.checkNotNull(temporaryBuffer); Assertions.checkNotNull(temporaryBuffer);
...@@ -181,7 +186,7 @@ public final class CacheUtil { ...@@ -181,7 +186,7 @@ public final class CacheUtil {
long position = dataSpec.position; long position = dataSpec.position;
boolean lengthUnset = bytesLeft == C.LENGTH_UNSET; boolean lengthUnset = bytesLeft == C.LENGTH_UNSET;
while (bytesLeft != 0) { while (bytesLeft != 0) {
throwExceptionIfInterruptedOrCancelled(isCanceled); throwExceptionIfCanceled(isCanceled);
long blockLength = long blockLength =
cache.getCachedLength(key, position, lengthUnset ? Long.MAX_VALUE : bytesLeft); cache.getCachedLength(key, position, lengthUnset ? Long.MAX_VALUE : bytesLeft);
if (blockLength > 0) { if (blockLength > 0) {
...@@ -233,12 +238,13 @@ public final class CacheUtil { ...@@ -233,12 +238,13 @@ public final class CacheUtil {
* @param position The position of the data to be read. * @param position The position of the data to be read.
* @param length Length of the data to be read, or {@link C#LENGTH_UNSET} if it is unknown. * @param length Length of the data to be read, or {@link C#LENGTH_UNSET} if it is unknown.
* @param dataSource The {@link CacheDataSource} to read the data from. * @param dataSource The {@link CacheDataSource} to read the data from.
* @param isCanceled An optional flag that will interrupt caching if set to true. * @param isCanceled An optional flag that will cancel the operation if set to true.
* @param progressNotifier A notifier through which to report progress updates, or {@code null}. * @param progressNotifier A notifier through which to report progress updates, or {@code null}.
* @param isLastBlock Whether this read block is the last block of the content. * @param isLastBlock Whether this read block is the last block of the content.
* @param temporaryBuffer A temporary buffer to be used during caching. * @param temporaryBuffer A temporary buffer to be used during caching.
* @return Number of read bytes, or 0 if no data is available because the end of the opened range * @return Number of read bytes, or 0 if no data is available because the end of the opened range
* has been reached. * has been reached.
* @param isCanceled An optional flag that will cancel the operation if set to true.
*/ */
private static long readAndDiscard( private static long readAndDiscard(
DataSpec dataSpec, DataSpec dataSpec,
...@@ -249,7 +255,7 @@ public final class CacheUtil { ...@@ -249,7 +255,7 @@ public final class CacheUtil {
@Nullable ProgressNotifier progressNotifier, @Nullable ProgressNotifier progressNotifier,
boolean isLastBlock, boolean isLastBlock,
byte[] temporaryBuffer) byte[] temporaryBuffer)
throws IOException, InterruptedException { throws IOException {
long positionOffset = position - dataSpec.position; long positionOffset = position - dataSpec.position;
long initialPositionOffset = positionOffset; long initialPositionOffset = positionOffset;
long endOffset = length != C.LENGTH_UNSET ? positionOffset + length : C.POSITION_UNSET; long endOffset = length != C.LENGTH_UNSET ? positionOffset + length : C.POSITION_UNSET;
...@@ -257,9 +263,13 @@ public final class CacheUtil { ...@@ -257,9 +263,13 @@ public final class CacheUtil {
while (true) { while (true) {
if (priorityTaskManager != null) { if (priorityTaskManager != null) {
// Wait for any other thread with higher priority to finish its job. // Wait for any other thread with higher priority to finish its job.
priorityTaskManager.proceed(dataSource.getUpstreamPriority()); try {
priorityTaskManager.proceed(dataSource.getUpstreamPriority());
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} }
throwExceptionIfInterruptedOrCancelled(isCanceled); throwExceptionIfCanceled(isCanceled);
try { try {
long resolvedLength = C.LENGTH_UNSET; long resolvedLength = C.LENGTH_UNSET;
boolean isDataSourceOpen = false; boolean isDataSourceOpen = false;
...@@ -286,7 +296,7 @@ public final class CacheUtil { ...@@ -286,7 +296,7 @@ public final class CacheUtil {
progressNotifier.onRequestLengthResolved(positionOffset + resolvedLength); progressNotifier.onRequestLengthResolved(positionOffset + resolvedLength);
} }
while (positionOffset != endOffset) { while (positionOffset != endOffset) {
throwExceptionIfInterruptedOrCancelled(isCanceled); throwExceptionIfCanceled(isCanceled);
int bytesRead = int bytesRead =
dataSource.read( dataSource.read(
temporaryBuffer, temporaryBuffer,
...@@ -369,10 +379,10 @@ public final class CacheUtil { ...@@ -369,10 +379,10 @@ public final class CacheUtil {
.buildCacheKey(dataSpec); .buildCacheKey(dataSpec);
} }
private static void throwExceptionIfInterruptedOrCancelled(@Nullable AtomicBoolean isCanceled) private static void throwExceptionIfCanceled(@Nullable AtomicBoolean isCanceled)
throws InterruptedException { throws InterruptedIOException {
if (Thread.interrupted() || (isCanceled != null && isCanceled.get())) { if (isCanceled != null && isCanceled.get()) {
throw new InterruptedException(); throw new InterruptedIOException();
} }
} }
......
...@@ -862,7 +862,7 @@ public class DownloadManagerTest { ...@@ -862,7 +862,7 @@ public class DownloadManagerTest {
} }
@Override @Override
public void download(ProgressListener listener) throws InterruptedException, IOException { public void download(ProgressListener listener) throws IOException {
startCount.incrementAndGet(); startCount.incrementAndGet();
downloadStarted.open(); downloadStarted.open();
try { try {
...@@ -884,7 +884,7 @@ public class DownloadManagerTest { ...@@ -884,7 +884,7 @@ public class DownloadManagerTest {
} }
@Override @Override
public void remove() throws InterruptedException { public void remove() {
startCount.incrementAndGet(); startCount.incrementAndGet();
removeStarted.open(); removeStarted.open();
try { try {
...@@ -937,9 +937,11 @@ public class DownloadManagerTest { ...@@ -937,9 +937,11 @@ public class DownloadManagerTest {
// Internal methods. // Internal methods.
private void block() throws InterruptedException { private void block() {
try { try {
blocker.block(); blocker.block();
} catch (InterruptedException e) {
throw new IllegalStateException(e); // Never happens.
} finally { } finally {
blocker.close(); blocker.close();
} }
......
...@@ -97,7 +97,7 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> { ...@@ -97,7 +97,7 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> {
@Override @Override
protected List<Segment> getSegments( protected List<Segment> getSegments(
DataSource dataSource, DashManifest manifest, boolean allowIncompleteList) DataSource dataSource, DashManifest manifest, boolean allowIncompleteList)
throws InterruptedException, IOException { throws IOException {
ArrayList<Segment> segments = new ArrayList<>(); ArrayList<Segment> segments = new ArrayList<>();
for (int i = 0; i < manifest.getPeriodCount(); i++) { for (int i = 0; i < manifest.getPeriodCount(); i++) {
Period period = manifest.getPeriod(i); Period period = manifest.getPeriod(i);
...@@ -124,7 +124,7 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> { ...@@ -124,7 +124,7 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> {
long periodDurationUs, long periodDurationUs,
boolean allowIncompleteList, boolean allowIncompleteList,
ArrayList<Segment> out) ArrayList<Segment> out)
throws IOException, InterruptedException { throws IOException {
for (int i = 0; i < adaptationSet.representations.size(); i++) { for (int i = 0; i < adaptationSet.representations.size(); i++) {
Representation representation = adaptationSet.representations.get(i); Representation representation = adaptationSet.representations.get(i);
DashSegmentIndex index; DashSegmentIndex index;
...@@ -172,8 +172,7 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> { ...@@ -172,8 +172,7 @@ public final class DashDownloader extends SegmentDownloader<DashManifest> {
} }
private static @Nullable DashSegmentIndex getSegmentIndex( private static @Nullable DashSegmentIndex getSegmentIndex(
DataSource dataSource, int trackType, Representation representation) DataSource dataSource, int trackType, Representation representation) throws IOException {
throws IOException, InterruptedException {
DashSegmentIndex index = representation.getIndex(); DashSegmentIndex index = representation.getIndex();
if (index != null) { if (index != null) {
return index; return index;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment