Commit 722a5b22 by olly Committed by Ian Baker

Move PriorityTaskManager inside CacheDataSource

In the same way it made sense to tie CacheKeyFactory and CacheDataSource
together in
https://github.com/google/ExoPlayer/commit/7ea83d7167d0b021fb280fca2b5dfa4a5616b5c1,
it makes sense to tie the PriorityTaskManager to the CacheDataSource as
well. This prevents error prone scenarios where one can end up being
passed around without (or with the wrong instance of) the other.

This change also internalizes use of PriorityDataSource directly into
CacheDataSource, rather than requiring the caller to chain things
themselves.

PiperOrigin-RevId: 307647290
parent fab80874
...@@ -20,7 +20,6 @@ import com.google.android.exoplayer2.C; ...@@ -20,7 +20,6 @@ import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.upstream.DataSink; import com.google.android.exoplayer2.upstream.DataSink;
import com.google.android.exoplayer2.upstream.DataSource; import com.google.android.exoplayer2.upstream.DataSource;
import com.google.android.exoplayer2.upstream.FileDataSource; import com.google.android.exoplayer2.upstream.FileDataSource;
import com.google.android.exoplayer2.upstream.PriorityDataSourceFactory;
import com.google.android.exoplayer2.upstream.cache.Cache; import com.google.android.exoplayer2.upstream.cache.Cache;
import com.google.android.exoplayer2.upstream.cache.CacheDataSinkFactory; import com.google.android.exoplayer2.upstream.cache.CacheDataSinkFactory;
import com.google.android.exoplayer2.upstream.cache.CacheDataSource; import com.google.android.exoplayer2.upstream.cache.CacheDataSource;
...@@ -30,7 +29,6 @@ import com.google.android.exoplayer2.util.PriorityTaskManager; ...@@ -30,7 +29,6 @@ import com.google.android.exoplayer2.util.PriorityTaskManager;
/** A helper class that holds necessary parameters for {@link Downloader} construction. */ /** A helper class that holds necessary parameters for {@link Downloader} construction. */
public final class DownloaderConstructorHelper { public final class DownloaderConstructorHelper {
@Nullable private final PriorityTaskManager priorityTaskManager;
private final CacheDataSource.Factory onlineCacheDataSourceFactory; private final CacheDataSource.Factory onlineCacheDataSourceFactory;
private final CacheDataSource.Factory offlineCacheDataSourceFactory; private final CacheDataSource.Factory offlineCacheDataSourceFactory;
...@@ -97,15 +95,12 @@ public final class DownloaderConstructorHelper { ...@@ -97,15 +95,12 @@ public final class DownloaderConstructorHelper {
@Nullable DataSink.Factory cacheWriteDataSinkFactory, @Nullable DataSink.Factory cacheWriteDataSinkFactory,
@Nullable PriorityTaskManager priorityTaskManager, @Nullable PriorityTaskManager priorityTaskManager,
@Nullable CacheKeyFactory cacheKeyFactory) { @Nullable CacheKeyFactory cacheKeyFactory) {
this.priorityTaskManager = priorityTaskManager;
if (priorityTaskManager != null) {
upstreamFactory =
new PriorityDataSourceFactory(upstreamFactory, priorityTaskManager, C.PRIORITY_DOWNLOAD);
}
onlineCacheDataSourceFactory = onlineCacheDataSourceFactory =
new CacheDataSource.Factory() new CacheDataSource.Factory()
.setCache(cache) .setCache(cache)
.setUpstreamDataSourceFactory(upstreamFactory) .setUpstreamDataSourceFactory(upstreamFactory)
.setUpstreamPriorityTaskManager(priorityTaskManager)
.setUpstreamPriority(C.PRIORITY_DOWNLOAD)
.setFlags(CacheDataSource.FLAG_BLOCK_ON_CACHE); .setFlags(CacheDataSource.FLAG_BLOCK_ON_CACHE);
offlineCacheDataSourceFactory = offlineCacheDataSourceFactory =
new CacheDataSource.Factory() new CacheDataSource.Factory()
...@@ -125,13 +120,6 @@ public final class DownloaderConstructorHelper { ...@@ -125,13 +120,6 @@ public final class DownloaderConstructorHelper {
} }
} }
/** Returns a {@link PriorityTaskManager} instance. */
public PriorityTaskManager getPriorityTaskManager() {
// Return a dummy PriorityTaskManager if none is provided. Create a new PriorityTaskManager
// each time so clients don't affect each other over the dummy PriorityTaskManager instance.
return priorityTaskManager != null ? priorityTaskManager : new PriorityTaskManager();
}
/** Returns a new {@link CacheDataSource} instance. */ /** Returns a new {@link CacheDataSource} instance. */
public CacheDataSource createCacheDataSource() { public CacheDataSource createCacheDataSource() {
return onlineCacheDataSourceFactory.createDataSource(); return onlineCacheDataSourceFactory.createDataSource();
......
...@@ -40,7 +40,6 @@ public final class ProgressiveDownloader implements Downloader { ...@@ -40,7 +40,6 @@ public final class ProgressiveDownloader implements Downloader {
private final DataSpec dataSpec; private final DataSpec dataSpec;
private final CacheDataSource dataSource; private final CacheDataSource dataSource;
private final PriorityTaskManager priorityTaskManager;
private final AtomicBoolean isCanceled; private final AtomicBoolean isCanceled;
/** /**
...@@ -58,26 +57,28 @@ public final class ProgressiveDownloader implements Downloader { ...@@ -58,26 +57,28 @@ public final class ProgressiveDownloader implements Downloader {
.setFlags(DataSpec.FLAG_ALLOW_CACHE_FRAGMENTATION) .setFlags(DataSpec.FLAG_ALLOW_CACHE_FRAGMENTATION)
.build(); .build();
this.dataSource = constructorHelper.createCacheDataSource(); this.dataSource = constructorHelper.createCacheDataSource();
this.priorityTaskManager = constructorHelper.getPriorityTaskManager();
isCanceled = new AtomicBoolean(); isCanceled = new AtomicBoolean();
} }
@Override @Override
public void download(@Nullable ProgressListener progressListener) public void download(@Nullable ProgressListener progressListener)
throws InterruptedException, IOException { throws InterruptedException, IOException {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD); @Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager();
if (priorityTaskManager != null) {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD);
}
try { try {
CacheUtil.cache( CacheUtil.cache(
dataSpec,
dataSource, dataSource,
new byte[BUFFER_SIZE_BYTES], dataSpec,
priorityTaskManager,
C.PRIORITY_DOWNLOAD,
progressListener == null ? null : new ProgressForwarder(progressListener), progressListener == null ? null : new ProgressForwarder(progressListener),
isCanceled, isCanceled,
/* enableEOFException= */ true); /* enableEOFException= */ true,
/* temporaryBuffer= */ new byte[BUFFER_SIZE_BYTES]);
} finally { } finally {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD); if (priorityTaskManager != null) {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
}
} }
} }
......
...@@ -68,7 +68,6 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -68,7 +68,6 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
private final DataSpec manifestDataSpec; private final DataSpec manifestDataSpec;
private final CacheDataSource dataSource; private final CacheDataSource dataSource;
private final CacheDataSource offlineDataSource; private final CacheDataSource offlineDataSource;
private final PriorityTaskManager priorityTaskManager;
private final ArrayList<StreamKey> streamKeys; private final ArrayList<StreamKey> streamKeys;
private final AtomicBoolean isCanceled; private final AtomicBoolean isCanceled;
...@@ -84,7 +83,6 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -84,7 +83,6 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
this.streamKeys = new ArrayList<>(streamKeys); this.streamKeys = new ArrayList<>(streamKeys);
this.dataSource = constructorHelper.createCacheDataSource(); this.dataSource = constructorHelper.createCacheDataSource();
this.offlineDataSource = constructorHelper.createOfflineCacheDataSource(); this.offlineDataSource = constructorHelper.createOfflineCacheDataSource();
this.priorityTaskManager = constructorHelper.getPriorityTaskManager();
isCanceled = new AtomicBoolean(); isCanceled = new AtomicBoolean();
} }
...@@ -98,7 +96,10 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -98,7 +96,10 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
@Override @Override
public final void download(@Nullable ProgressListener progressListener) public final void download(@Nullable ProgressListener progressListener)
throws IOException, InterruptedException { throws IOException, InterruptedException {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD); @Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager();
if (priorityTaskManager != null) {
priorityTaskManager.add(C.PRIORITY_DOWNLOAD);
}
try { try {
// Get the manifest and all of the segments. // Get the manifest and all of the segments.
M manifest = getManifest(dataSource, manifestDataSpec); M manifest = getManifest(dataSource, manifestDataSpec);
...@@ -147,23 +148,23 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme ...@@ -147,23 +148,23 @@ public abstract class SegmentDownloader<M extends FilterableManifest<M>> impleme
bytesDownloaded, bytesDownloaded,
segmentsDownloaded); segmentsDownloaded);
} }
byte[] buffer = new byte[BUFFER_SIZE_BYTES]; byte[] temporaryBuffer = new byte[BUFFER_SIZE_BYTES];
for (int i = 0; i < segments.size(); i++) { for (int i = 0; i < segments.size(); i++) {
CacheUtil.cache( CacheUtil.cache(
segments.get(i).dataSpec,
dataSource, dataSource,
buffer, segments.get(i).dataSpec,
priorityTaskManager,
C.PRIORITY_DOWNLOAD,
progressNotifier, progressNotifier,
isCanceled, isCanceled,
true); /* enableEOFException= */ true,
temporaryBuffer);
if (progressNotifier != null) { if (progressNotifier != null) {
progressNotifier.onSegmentDownloaded(); progressNotifier.onSegmentDownloaded();
} }
} }
} finally { } finally {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD); if (priorityTaskManager != null) {
priorityTaskManager.remove(C.PRIORITY_DOWNLOAD);
}
} }
} }
......
...@@ -25,10 +25,12 @@ import com.google.android.exoplayer2.upstream.DataSourceException; ...@@ -25,10 +25,12 @@ import com.google.android.exoplayer2.upstream.DataSourceException;
import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.upstream.DummyDataSource; import com.google.android.exoplayer2.upstream.DummyDataSource;
import com.google.android.exoplayer2.upstream.FileDataSource; import com.google.android.exoplayer2.upstream.FileDataSource;
import com.google.android.exoplayer2.upstream.PriorityDataSource;
import com.google.android.exoplayer2.upstream.TeeDataSource; import com.google.android.exoplayer2.upstream.TeeDataSource;
import com.google.android.exoplayer2.upstream.TransferListener; import com.google.android.exoplayer2.upstream.TransferListener;
import com.google.android.exoplayer2.upstream.cache.Cache.CacheException; import com.google.android.exoplayer2.upstream.cache.Cache.CacheException;
import com.google.android.exoplayer2.util.Assertions; import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.PriorityTaskManager;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.annotation.Documented; import java.lang.annotation.Documented;
...@@ -55,6 +57,8 @@ public final class CacheDataSource implements DataSource { ...@@ -55,6 +57,8 @@ public final class CacheDataSource implements DataSource {
private CacheKeyFactory cacheKeyFactory; private CacheKeyFactory cacheKeyFactory;
private boolean cacheIsReadOnly; private boolean cacheIsReadOnly;
@Nullable private DataSource.Factory upstreamDataSourceFactory; @Nullable private DataSource.Factory upstreamDataSourceFactory;
@Nullable private PriorityTaskManager upstreamPriorityTaskManager;
private int upstreamPriority;
@CacheDataSource.Flags private int flags; @CacheDataSource.Flags private int flags;
@Nullable private CacheDataSource.EventListener eventListener; @Nullable private CacheDataSource.EventListener eventListener;
...@@ -138,6 +142,44 @@ public final class CacheDataSource implements DataSource { ...@@ -138,6 +142,44 @@ public final class CacheDataSource implements DataSource {
} }
/** /**
* Sets an optional {@link PriorityTaskManager} to use when requesting data from upstream.
*
* <p>If set, reads from the upstream {@link DataSource} will only be allowed to proceed if
* there are no higher priority tasks registered to the {@link PriorityTaskManager}. If there
* exists a higher priority task then {@link PriorityTaskManager.PriorityTooLowException} will
* be thrown instead.
*
* <p>Note that requests to {@link CacheDataSource} instances are intended to be used as parts
* of (possibly larger) tasks that are registered with the {@link PriorityTaskManager}, and
* hence {@link CacheDataSource} does <em>not</em> register a task by itself. This must be done
* by the surrounding code that uses the {@link CacheDataSource} instances.
*
* <p>The default is {@code null}.
*
* @param upstreamPriorityTaskManager The upstream {@link PriorityTaskManager}.
* @return This factory.
*/
public Factory setUpstreamPriorityTaskManager(
@Nullable PriorityTaskManager upstreamPriorityTaskManager) {
this.upstreamPriorityTaskManager = upstreamPriorityTaskManager;
return this;
}
/**
* Sets the priority to use when requesting data from upstream. The priority is only used if a
* {@link PriorityTaskManager} is set by calling {@link #setUpstreamPriorityTaskManager}.
*
* <p>The default is {@link C#PRIORITY_PLAYBACK}.
*
* @param upstreamPriority The priority to use when requesting data from upstream.
* @return This factory.
*/
public Factory setUpstreamPriority(int upstreamPriority) {
this.upstreamPriority = upstreamPriority;
return this;
}
/**
* Sets the {@link CacheDataSource.Flags}. * Sets the {@link CacheDataSource.Flags}.
* *
* <p>The default is {@code 0}. * <p>The default is {@code 0}.
...@@ -182,9 +224,11 @@ public final class CacheDataSource implements DataSource { ...@@ -182,9 +224,11 @@ public final class CacheDataSource implements DataSource {
upstreamDataSource, upstreamDataSource,
cacheReadDataSourceFactory.createDataSource(), cacheReadDataSourceFactory.createDataSource(),
cacheWriteDataSink, cacheWriteDataSink,
cacheKeyFactory,
flags, flags,
eventListener, upstreamPriorityTaskManager,
cacheKeyFactory); upstreamPriority,
eventListener);
} }
} }
...@@ -267,6 +311,8 @@ public final class CacheDataSource implements DataSource { ...@@ -267,6 +311,8 @@ public final class CacheDataSource implements DataSource {
@Nullable private final DataSource cacheWriteDataSource; @Nullable private final DataSource cacheWriteDataSource;
private final DataSource upstreamDataSource; private final DataSource upstreamDataSource;
private final CacheKeyFactory cacheKeyFactory; private final CacheKeyFactory cacheKeyFactory;
@Nullable private final PriorityTaskManager upstreamPriorityTaskManager;
private final int upstreamPriority;
@Nullable private final EventListener eventListener; @Nullable private final EventListener eventListener;
private final boolean blockOnCache; private final boolean blockOnCache;
...@@ -373,6 +419,28 @@ public final class CacheDataSource implements DataSource { ...@@ -373,6 +419,28 @@ public final class CacheDataSource implements DataSource {
@Flags int flags, @Flags int flags,
@Nullable EventListener eventListener, @Nullable EventListener eventListener,
@Nullable CacheKeyFactory cacheKeyFactory) { @Nullable CacheKeyFactory cacheKeyFactory) {
this(
cache,
upstreamDataSource,
cacheReadDataSource,
cacheWriteDataSink,
cacheKeyFactory,
flags,
/* upstreamPriorityTaskManager= */ null,
/* upstreamPriority= */ C.PRIORITY_PLAYBACK,
eventListener);
}
private CacheDataSource(
Cache cache,
@Nullable DataSource upstreamDataSource,
DataSource cacheReadDataSource,
@Nullable DataSink cacheWriteDataSink,
@Nullable CacheKeyFactory cacheKeyFactory,
@Flags int flags,
@Nullable PriorityTaskManager upstreamPriorityTaskManager,
int upstreamPriority,
@Nullable EventListener eventListener) {
this.cache = cache; this.cache = cache;
this.cacheReadDataSource = cacheReadDataSource; this.cacheReadDataSource = cacheReadDataSource;
this.cacheKeyFactory = this.cacheKeyFactory =
...@@ -381,14 +449,22 @@ public final class CacheDataSource implements DataSource { ...@@ -381,14 +449,22 @@ public final class CacheDataSource implements DataSource {
this.ignoreCacheOnError = (flags & FLAG_IGNORE_CACHE_ON_ERROR) != 0; this.ignoreCacheOnError = (flags & FLAG_IGNORE_CACHE_ON_ERROR) != 0;
this.ignoreCacheForUnsetLengthRequests = this.ignoreCacheForUnsetLengthRequests =
(flags & FLAG_IGNORE_CACHE_FOR_UNSET_LENGTH_REQUESTS) != 0; (flags & FLAG_IGNORE_CACHE_FOR_UNSET_LENGTH_REQUESTS) != 0;
this.upstreamPriority = upstreamPriority;
if (upstreamDataSource != null) { if (upstreamDataSource != null) {
if (upstreamPriorityTaskManager != null) {
upstreamDataSource =
new PriorityDataSource(
upstreamDataSource, upstreamPriorityTaskManager, upstreamPriority);
}
this.upstreamDataSource = upstreamDataSource; this.upstreamDataSource = upstreamDataSource;
this.upstreamPriorityTaskManager = upstreamPriorityTaskManager;
this.cacheWriteDataSource = this.cacheWriteDataSource =
cacheWriteDataSink != null cacheWriteDataSink != null
? new TeeDataSource(upstreamDataSource, cacheWriteDataSink) ? new TeeDataSource(upstreamDataSource, cacheWriteDataSink)
: null; : null;
} else { } else {
this.upstreamDataSource = DummyDataSource.INSTANCE; this.upstreamDataSource = DummyDataSource.INSTANCE;
this.upstreamPriorityTaskManager = null;
this.cacheWriteDataSource = null; this.cacheWriteDataSource = null;
} }
this.eventListener = eventListener; this.eventListener = eventListener;
...@@ -404,6 +480,24 @@ public final class CacheDataSource implements DataSource { ...@@ -404,6 +480,24 @@ public final class CacheDataSource implements DataSource {
return cacheKeyFactory; return cacheKeyFactory;
} }
/**
* Returns the {@link PriorityTaskManager} used when there's a cache miss and requests need to be
* made to the upstream {@link DataSource}, or {@code null} if there is none.
*/
@Nullable
public PriorityTaskManager getUpstreamPriorityTaskManager() {
return upstreamPriorityTaskManager;
}
/**
* Returns the priority used when there's a cache miss and requests need to be made to the
* upstream {@link DataSource}. The priority is only used if the source has a {@link
* PriorityTaskManager}.
*/
public int getUpstreamPriority() {
return upstreamPriority;
}
@Override @Override
public void addTransferListener(TransferListener transferListener) { public void addTransferListener(TransferListener transferListener) {
cacheReadDataSource.addTransferListener(transferListener); cacheReadDataSource.addTransferListener(transferListener);
......
...@@ -107,9 +107,9 @@ public final class CacheUtil { ...@@ -107,9 +107,9 @@ public final class CacheUtil {
* *
* <p>This method may be slow and shouldn't normally be called on the main thread. * <p>This method may be slow and shouldn't normally be called on the main thread.
* *
* @param dataSpec Defines the data to be cached.
* @param cache A {@link Cache} to store the data. * @param cache A {@link Cache} to store the data.
* @param upstream A {@link DataSource} for reading data not in the cache. * @param dataSpec Defines the data to be cached.
* @param upstreamDataSource A {@link DataSource} for reading data not in the cache.
* @param progressListener A listener to receive progress updates, or {@code null}. * @param progressListener A listener to receive progress updates, or {@code null}.
* @param isCanceled An optional flag that will interrupt caching if set to true. * @param isCanceled An optional flag that will interrupt caching if set to true.
* @throws IOException If an error occurs reading from the source. * @throws IOException If an error occurs reading from the source.
...@@ -117,61 +117,52 @@ public final class CacheUtil { ...@@ -117,61 +117,52 @@ public final class CacheUtil {
*/ */
@WorkerThread @WorkerThread
public static void cache( public static void cache(
DataSpec dataSpec,
Cache cache, Cache cache,
DataSource upstream, DataSpec dataSpec,
DataSource upstreamDataSource,
@Nullable ProgressListener progressListener, @Nullable ProgressListener progressListener,
@Nullable AtomicBoolean isCanceled) @Nullable AtomicBoolean isCanceled)
throws IOException, InterruptedException { throws IOException, InterruptedException {
cache( cache(
new CacheDataSource(cache, upstreamDataSource),
dataSpec, dataSpec,
new CacheDataSource(cache, upstream),
new byte[DEFAULT_BUFFER_SIZE_BYTES],
/* priorityTaskManager= */ null,
/* priority= */ 0,
progressListener, progressListener,
isCanceled, isCanceled,
/* enableEOFException= */ false); /* enableEOFException= */ false,
new byte[DEFAULT_BUFFER_SIZE_BYTES]);
} }
/** /**
* Caches the data defined by {@code dataSpec}, skipping already cached data. Caching stops early * Caches the data defined by {@code dataSpec}, skipping already cached data. Caching stops early
* if end of input is reached and {@code enableEOFException} is false. * if end of input is reached and {@code enableEOFException} is false.
* *
* <p>If a {@link PriorityTaskManager} is provided, it's used to pause and resume caching * <p>If {@code dataSource} has a {@link PriorityTaskManager}, then it's the responsibility of the
* depending on {@code priority} and the priority of other tasks registered to the * calling code to call {@link PriorityTaskManager#add} to register with the manager before
* PriorityTaskManager. Please note that it's the responsibility of the calling code to call * calling this method, and to call {@link PriorityTaskManager#remove} afterwards to unregister.
* {@link PriorityTaskManager#add} to register with the manager before calling this method, and to
* call {@link PriorityTaskManager#remove} afterwards to unregister.
* *
* <p>This method may be slow and shouldn't normally be called on the main thread. * <p>This method may be slow and shouldn't normally be called on the main thread.
* *
* @param dataSpec Defines the data to be cached.
* @param dataSource A {@link CacheDataSource} to be used for caching the data. * @param dataSource A {@link CacheDataSource} to be used for caching the data.
* @param buffer The buffer to be used while caching. * @param dataSpec Defines the data to be cached.
* @param priorityTaskManager If not null it's used to check whether it is allowed to proceed with
* caching.
* @param priority The priority of this task. Used with {@code priorityTaskManager}.
* @param progressListener A listener to receive progress updates, or {@code null}. * @param progressListener A listener to receive progress updates, or {@code null}.
* @param isCanceled An optional flag that will interrupt caching if set to true. * @param isCanceled An optional flag that will interrupt caching if set to true.
* @param enableEOFException Whether to throw an {@link EOFException} if end of input has been * @param enableEOFException Whether to throw an {@link EOFException} if end of input has been
* reached unexpectedly. * reached unexpectedly.
* @param temporaryBuffer A temporary buffer to be used during caching.
* @throws IOException If an error occurs reading from the source. * @throws IOException If an error occurs reading from the source.
* @throws InterruptedException If the thread was interrupted directly or via {@code isCanceled}. * @throws InterruptedException If the thread was interrupted directly or via {@code isCanceled}.
*/ */
@WorkerThread @WorkerThread
public static void cache( public static void cache(
DataSpec dataSpec,
CacheDataSource dataSource, CacheDataSource dataSource,
byte[] buffer, DataSpec dataSpec,
@Nullable PriorityTaskManager priorityTaskManager,
int priority,
@Nullable ProgressListener progressListener, @Nullable ProgressListener progressListener,
@Nullable AtomicBoolean isCanceled, @Nullable AtomicBoolean isCanceled,
boolean enableEOFException) boolean enableEOFException,
byte[] temporaryBuffer)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Assertions.checkNotNull(dataSource); Assertions.checkNotNull(dataSource);
Assertions.checkNotNull(buffer); Assertions.checkNotNull(temporaryBuffer);
Cache cache = dataSource.getCache(); Cache cache = dataSource.getCache();
CacheKeyFactory cacheKeyFactory = dataSource.getCacheKeyFactory(); CacheKeyFactory cacheKeyFactory = dataSource.getCacheKeyFactory();
...@@ -206,12 +197,10 @@ public final class CacheUtil { ...@@ -206,12 +197,10 @@ public final class CacheUtil {
position, position,
length, length,
dataSource, dataSource,
buffer, isCanceled,
priorityTaskManager,
priority,
progressNotifier, progressNotifier,
isLastBlock, isLastBlock,
isCanceled); temporaryBuffer);
if (read < blockLength) { if (read < blockLength) {
// Reached to the end of the data. // Reached to the end of the data.
if (enableEOFException && !lengthUnset) { if (enableEOFException && !lengthUnset) {
...@@ -243,14 +232,11 @@ public final class CacheUtil { ...@@ -243,14 +232,11 @@ public final class CacheUtil {
* overwritten by the following parameters. * overwritten by the following parameters.
* @param position The position of the data to be read. * @param position The position of the data to be read.
* @param length Length of the data to be read, or {@link C#LENGTH_UNSET} if it is unknown. * @param length Length of the data to be read, or {@link C#LENGTH_UNSET} if it is unknown.
* @param dataSource The {@link DataSource} to read the data from. * @param dataSource The {@link CacheDataSource} to read the data from.
* @param buffer The buffer to be used while downloading. * @param isCanceled An optional flag that will interrupt caching if set to true.
* @param priorityTaskManager If not null it's used to check whether it is allowed to proceed with
* caching.
* @param priority The priority of this task.
* @param progressNotifier A notifier through which to report progress updates, or {@code null}. * @param progressNotifier A notifier through which to report progress updates, or {@code null}.
* @param isLastBlock Whether this read block is the last block of the content. * @param isLastBlock Whether this read block is the last block of the content.
* @param isCanceled An optional flag that will interrupt caching if set to true. * @param temporaryBuffer A temporary buffer to be used during caching.
* @return Number of read bytes, or 0 if no data is available because the end of the opened range * @return Number of read bytes, or 0 if no data is available because the end of the opened range
* has been reached. * has been reached.
*/ */
...@@ -258,21 +244,20 @@ public final class CacheUtil { ...@@ -258,21 +244,20 @@ public final class CacheUtil {
DataSpec dataSpec, DataSpec dataSpec,
long position, long position,
long length, long length,
DataSource dataSource, CacheDataSource dataSource,
byte[] buffer, @Nullable AtomicBoolean isCanceled,
@Nullable PriorityTaskManager priorityTaskManager,
int priority,
@Nullable ProgressNotifier progressNotifier, @Nullable ProgressNotifier progressNotifier,
boolean isLastBlock, boolean isLastBlock,
@Nullable AtomicBoolean isCanceled) byte[] temporaryBuffer)
throws IOException, InterruptedException { throws IOException, InterruptedException {
long positionOffset = position - dataSpec.position; long positionOffset = position - dataSpec.position;
long initialPositionOffset = positionOffset; long initialPositionOffset = positionOffset;
long endOffset = length != C.LENGTH_UNSET ? positionOffset + length : C.POSITION_UNSET; long endOffset = length != C.LENGTH_UNSET ? positionOffset + length : C.POSITION_UNSET;
@Nullable PriorityTaskManager priorityTaskManager = dataSource.getUpstreamPriorityTaskManager();
while (true) { while (true) {
if (priorityTaskManager != null) { if (priorityTaskManager != null) {
// Wait for any other thread with higher priority to finish its job. // Wait for any other thread with higher priority to finish its job.
priorityTaskManager.proceed(priority); priorityTaskManager.proceed(dataSource.getUpstreamPriority());
} }
throwExceptionIfInterruptedOrCancelled(isCanceled); throwExceptionIfInterruptedOrCancelled(isCanceled);
try { try {
...@@ -304,11 +289,11 @@ public final class CacheUtil { ...@@ -304,11 +289,11 @@ public final class CacheUtil {
throwExceptionIfInterruptedOrCancelled(isCanceled); throwExceptionIfInterruptedOrCancelled(isCanceled);
int bytesRead = int bytesRead =
dataSource.read( dataSource.read(
buffer, temporaryBuffer,
0, 0,
endOffset != C.POSITION_UNSET endOffset != C.POSITION_UNSET
? (int) Math.min(buffer.length, endOffset - positionOffset) ? (int) Math.min(temporaryBuffer.length, endOffset - positionOffset)
: buffer.length); : temporaryBuffer.length);
if (bytesRead == C.RESULT_END_OF_INPUT) { if (bytesRead == C.RESULT_END_OF_INPUT) {
if (progressNotifier != null) { if (progressNotifier != null) {
progressNotifier.onRequestLengthResolved(positionOffset); progressNotifier.onRequestLengthResolved(positionOffset);
......
...@@ -358,11 +358,7 @@ public final class CacheDataSourceTest { ...@@ -358,11 +358,7 @@ public final class CacheDataSourceTest {
.appendReadData(1024 * 1024) .appendReadData(1024 * 1024)
.endData()); .endData());
CacheUtil.cache( CacheUtil.cache(
unboundedDataSpec, cache, unboundedDataSpec, upstream2, /* progressListener= */ null, /* isCanceled= */ null);
cache,
upstream2,
/* progressListener= */ null,
/* isCanceled= */ null);
// Read the rest of the data. // Read the rest of the data.
TestUtil.readToEnd(cacheDataSource); TestUtil.readToEnd(cacheDataSource);
...@@ -406,11 +402,7 @@ public final class CacheDataSourceTest { ...@@ -406,11 +402,7 @@ public final class CacheDataSourceTest {
.appendReadData(1024 * 1024) .appendReadData(1024 * 1024)
.endData()); .endData());
CacheUtil.cache( CacheUtil.cache(
unboundedDataSpec, cache, unboundedDataSpec, upstream2, /* progressListener= */ null, /* isCanceled= */ null);
cache,
upstream2,
/* progressListener= */ null,
/* isCanceled= */ null);
// Read the rest of the data. // Read the rest of the data.
TestUtil.readToEnd(cacheDataSource); TestUtil.readToEnd(cacheDataSource);
...@@ -429,11 +421,7 @@ public final class CacheDataSourceTest { ...@@ -429,11 +421,7 @@ public final class CacheDataSourceTest {
int halfDataLength = 512; int halfDataLength = 512;
DataSpec dataSpec = buildDataSpec(halfDataLength, C.LENGTH_UNSET); DataSpec dataSpec = buildDataSpec(halfDataLength, C.LENGTH_UNSET);
CacheUtil.cache( CacheUtil.cache(
dataSpec, cache, dataSpec, upstream, /* progressListener= */ null, /* isCanceled= */ null);
cache,
upstream,
/* progressListener= */ null,
/* isCanceled= */ null);
// Create cache read-only CacheDataSource. // Create cache read-only CacheDataSource.
CacheDataSource cacheDataSource = CacheDataSource cacheDataSource =
...@@ -464,11 +452,7 @@ public final class CacheDataSourceTest { ...@@ -464,11 +452,7 @@ public final class CacheDataSourceTest {
int halfDataLength = 512; int halfDataLength = 512;
DataSpec dataSpec = buildDataSpec(/* position= */ 0, halfDataLength); DataSpec dataSpec = buildDataSpec(/* position= */ 0, halfDataLength);
CacheUtil.cache( CacheUtil.cache(
dataSpec, cache, dataSpec, upstream, /* progressListener= */ null, /* isCanceled= */ null);
cache,
upstream,
/* progressListener= */ null,
/* isCanceled= */ null);
// Create blocking CacheDataSource. // Create blocking CacheDataSource.
CacheDataSource cacheDataSource = CacheDataSource cacheDataSource =
......
...@@ -201,11 +201,7 @@ public final class CacheUtilTest { ...@@ -201,11 +201,7 @@ public final class CacheUtilTest {
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache( CacheUtil.cache(
new DataSpec(Uri.parse("test_data")), cache, new DataSpec(Uri.parse("test_data")), dataSource, counters, /* isCanceled= */ null);
cache,
dataSource,
counters,
/* isCanceled= */ null);
counters.assertValues(0, 100, 100); counters.assertValues(0, 100, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -219,17 +215,12 @@ public final class CacheUtilTest { ...@@ -219,17 +215,12 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data"); Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20);
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(dataSpec, cache, dataSource, counters, /* isCanceled= */ null); CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null);
counters.assertValues(0, 20, 20); counters.assertValues(0, 20, 20);
counters.reset(); counters.reset();
CacheUtil.cache( CacheUtil.cache(cache, new DataSpec(testUri), dataSource, counters, /* isCanceled= */ null);
new DataSpec(testUri),
cache,
dataSource,
counters,
/* isCanceled= */ null);
counters.assertValues(20, 80, 100); counters.assertValues(20, 80, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -244,7 +235,7 @@ public final class CacheUtilTest { ...@@ -244,7 +235,7 @@ public final class CacheUtilTest {
DataSpec dataSpec = new DataSpec(Uri.parse("test_data")); DataSpec dataSpec = new DataSpec(Uri.parse("test_data"));
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(dataSpec, cache, dataSource, counters, /* isCanceled= */ null); CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null);
counters.assertValues(0, 100, 100); counters.assertValues(0, 100, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -260,17 +251,12 @@ public final class CacheUtilTest { ...@@ -260,17 +251,12 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data"); Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 10, /* length= */ 20);
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(dataSpec, cache, dataSource, counters, /* isCanceled= */ null); CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null);
counters.assertValues(0, 20, 20); counters.assertValues(0, 20, 20);
counters.reset(); counters.reset();
CacheUtil.cache( CacheUtil.cache(cache, new DataSpec(testUri), dataSource, counters, /* isCanceled= */ null);
new DataSpec(testUri),
cache,
dataSource,
counters,
/* isCanceled= */ null);
counters.assertValues(20, 80, 100); counters.assertValues(20, 80, 100);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -284,7 +270,7 @@ public final class CacheUtilTest { ...@@ -284,7 +270,7 @@ public final class CacheUtilTest {
Uri testUri = Uri.parse("test_data"); Uri testUri = Uri.parse("test_data");
DataSpec dataSpec = new DataSpec(testUri, /* position= */ 0, /* length= */ 1000); DataSpec dataSpec = new DataSpec(testUri, /* position= */ 0, /* length= */ 1000);
CachingCounters counters = new CachingCounters(); CachingCounters counters = new CachingCounters();
CacheUtil.cache(dataSpec, cache, dataSource, counters, /* isCanceled= */ null); CacheUtil.cache(cache, dataSpec, dataSource, counters, /* isCanceled= */ null);
counters.assertValues(0, 100, 1000); counters.assertValues(0, 100, 1000);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -300,14 +286,12 @@ public final class CacheUtilTest { ...@@ -300,14 +286,12 @@ public final class CacheUtilTest {
try { try {
CacheUtil.cache( CacheUtil.cache(
dataSpec,
new CacheDataSource(cache, dataSource), new CacheDataSource(cache, dataSource),
new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES], dataSpec,
/* priorityTaskManager= */ null,
/* priority= */ 0,
/* progressListener= */ null, /* progressListener= */ null,
/* isCanceled= */ null, /* isCanceled= */ null,
/* enableEOFException= */ true); /* enableEOFException= */ true,
/* temporaryBuffer= */ new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES]);
fail(); fail();
} catch (EOFException e) { } catch (EOFException e) {
// Do nothing. // Do nothing.
...@@ -329,11 +313,7 @@ public final class CacheUtilTest { ...@@ -329,11 +313,7 @@ public final class CacheUtilTest {
FakeDataSource dataSource = new FakeDataSource(fakeDataSet); FakeDataSource dataSource = new FakeDataSource(fakeDataSet);
CacheUtil.cache( CacheUtil.cache(
new DataSpec(Uri.parse("test_data")), cache, new DataSpec(Uri.parse("test_data")), dataSource, counters, /* isCanceled= */ null);
cache,
dataSource,
counters,
/* isCanceled= */ null);
counters.assertValues(0, 300, 300); counters.assertValues(0, 300, 300);
assertCachedData(cache, fakeDataSet); assertCachedData(cache, fakeDataSet);
...@@ -350,7 +330,6 @@ public final class CacheUtilTest { ...@@ -350,7 +330,6 @@ public final class CacheUtilTest {
.setFlags(DataSpec.FLAG_ALLOW_CACHE_FRAGMENTATION) .setFlags(DataSpec.FLAG_ALLOW_CACHE_FRAGMENTATION)
.build(); .build();
CacheUtil.cache( CacheUtil.cache(
dataSpec,
// Set fragmentSize to 10 to make sure there are multiple spans. // Set fragmentSize to 10 to make sure there are multiple spans.
new CacheDataSource( new CacheDataSource(
cache, cache,
...@@ -359,12 +338,11 @@ public final class CacheUtilTest { ...@@ -359,12 +338,11 @@ public final class CacheUtilTest {
new CacheDataSink(cache, /* fragmentSize= */ 10), new CacheDataSink(cache, /* fragmentSize= */ 10),
/* flags= */ 0, /* flags= */ 0,
/* eventListener= */ null), /* eventListener= */ null),
new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES], dataSpec,
/* priorityTaskManager= */ null,
/* priority= */ 0,
/* progressListener= */ null, /* progressListener= */ null,
/* isCanceled= */ null, /* isCanceled= */ null,
true); /* enableEOFException= */ true,
/* temporaryBuffer= */ new byte[CacheUtil.DEFAULT_BUFFER_SIZE_BYTES]);
CacheUtil.remove(dataSpec, cache, /* cacheKeyFactory= */ null); CacheUtil.remove(dataSpec, cache, /* cacheKeyFactory= */ null);
assertCacheEmpty(cache); assertCacheEmpty(cache);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment