Commit d215b811 by olly Committed by Oliver Woodman

Rework DownloadManager to fix remaining TODOs

- Removed DownloadInternal and its sometimes-out-of-sync
  duplicate state
- Fixed downloads being in STOPPED rather than QUEUED state
  when the manager is paused
- Fixed setMaxParallelDownloads to start/stop downloads if
  necessary when the value changes
- Fixed isWaitingForRequirements

PiperOrigin-RevId: 246164845
parent 6c1065c6
......@@ -67,11 +67,12 @@ public final class ActionFileUpgradeUtil {
if (actionFile.exists()) {
boolean success = false;
try {
long nowMs = System.currentTimeMillis();
for (DownloadRequest request : actionFile.load()) {
if (downloadIdProvider != null) {
request = request.copyWithId(downloadIdProvider.getId(request));
}
mergeRequest(request, downloadIndex, addNewDownloadsAsCompleted);
mergeRequest(request, downloadIndex, addNewDownloadsAsCompleted, nowMs);
}
success = true;
} finally {
......@@ -93,13 +94,13 @@ public final class ActionFileUpgradeUtil {
/* package */ static void mergeRequest(
DownloadRequest request,
DefaultDownloadIndex downloadIndex,
boolean addNewDownloadAsCompleted)
boolean addNewDownloadAsCompleted,
long nowMs)
throws IOException {
Download download = downloadIndex.getDownload(request.id);
if (download != null) {
download = DownloadManager.mergeRequest(download, request, download.stopReason);
download = DownloadManager.mergeRequest(download, request, download.stopReason, nowMs);
} else {
long nowMs = System.currentTimeMillis();
download =
new Download(
request,
......
......@@ -69,7 +69,9 @@ public final class DefaultDownloadIndex implements WritableDownloadIndex {
private static final int COLUMN_INDEX_BYTES_DOWNLOADED = 13;
private static final String WHERE_ID_EQUALS = COLUMN_ID + " = ?";
private static final String WHERE_STATE_TERMINAL =
private static final String WHERE_STATE_IS_DOWNLOADING =
COLUMN_STATE + " = " + Download.STATE_DOWNLOADING;
private static final String WHERE_STATE_IS_TERMINAL =
getStateQuery(Download.STATE_COMPLETED, Download.STATE_FAILED);
private static final String[] COLUMNS =
......@@ -219,13 +221,26 @@ public final class DefaultDownloadIndex implements WritableDownloadIndex {
}
@Override
public void setDownloadingStatesToQueued() throws DatabaseIOException {
ensureInitialized();
try {
ContentValues values = new ContentValues();
values.put(COLUMN_STATE, Download.STATE_QUEUED);
SQLiteDatabase writableDatabase = databaseProvider.getWritableDatabase();
writableDatabase.update(tableName, values, WHERE_STATE_IS_DOWNLOADING, /* whereArgs= */ null);
} catch (SQLException e) {
throw new DatabaseIOException(e);
}
}
@Override
public void setStopReason(int stopReason) throws DatabaseIOException {
ensureInitialized();
try {
ContentValues values = new ContentValues();
values.put(COLUMN_STOP_REASON, stopReason);
SQLiteDatabase writableDatabase = databaseProvider.getWritableDatabase();
writableDatabase.update(tableName, values, WHERE_STATE_TERMINAL, /* whereArgs= */ null);
writableDatabase.update(tableName, values, WHERE_STATE_IS_TERMINAL, /* whereArgs= */ null);
} catch (SQLException e) {
throw new DatabaseIOException(e);
}
......@@ -239,7 +254,10 @@ public final class DefaultDownloadIndex implements WritableDownloadIndex {
values.put(COLUMN_STOP_REASON, stopReason);
SQLiteDatabase writableDatabase = databaseProvider.getWritableDatabase();
writableDatabase.update(
tableName, values, WHERE_STATE_TERMINAL + " AND " + WHERE_ID_EQUALS, new String[] {id});
tableName,
values,
WHERE_STATE_IS_TERMINAL + " AND " + WHERE_ID_EQUALS,
new String[] {id});
} catch (SQLException e) {
throw new DatabaseIOException(e);
}
......
......@@ -31,7 +31,6 @@ import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import android.os.Message;
import androidx.annotation.IntDef;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.database.DatabaseProvider;
......@@ -46,14 +45,11 @@ import com.google.android.exoplayer2.util.Assertions;
import com.google.android.exoplayer2.util.Log;
import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/**
* Manages downloads.
......@@ -125,8 +121,7 @@ public final class DownloadManager {
// Messages posted to the main handler.
private static final int MSG_INITIALIZED = 0;
private static final int MSG_PROCESSED = 1;
private static final int MSG_DOWNLOAD_CHANGED = 2;
private static final int MSG_DOWNLOAD_REMOVED = 3;
private static final int MSG_DOWNLOAD_UPDATE = 2;
// Messages posted to the background handler.
private static final int MSG_INITIALIZE = 0;
......@@ -141,31 +136,14 @@ public final class DownloadManager {
private static final int MSG_CONTENT_LENGTH_CHANGED = 9;
private static final int MSG_RELEASE = 10;
@Retention(RetentionPolicy.SOURCE)
@IntDef({
START_THREAD_SUCCEEDED,
START_THREAD_WAIT_REMOVAL_TO_FINISH,
START_THREAD_WAIT_DOWNLOAD_CANCELLATION,
START_THREAD_TOO_MANY_DOWNLOADS
})
private @interface StartThreadResults {}
private static final int START_THREAD_SUCCEEDED = 0;
private static final int START_THREAD_WAIT_REMOVAL_TO_FINISH = 1;
private static final int START_THREAD_WAIT_DOWNLOAD_CANCELLATION = 2;
private static final int START_THREAD_TOO_MANY_DOWNLOADS = 3;
private static final String TAG = "DownloadManager";
private static final boolean DEBUG = false;
private final Context context;
private final WritableDownloadIndex downloadIndex;
private final Handler mainHandler;
private final InternalHandler internalHandler;
private final RequirementsWatcher.Listener requirementsListener;
private final CopyOnWriteArraySet<Listener> listeners;
private final ArrayList<Download> downloads;
private int pendingMessages;
private int activeTaskCount;
......@@ -174,6 +152,7 @@ public final class DownloadManager {
private int maxParallelDownloads;
private int minRetryCount;
private int notMetRequirements;
private List<Download> downloads;
private RequirementsWatcher requirementsWatcher;
/**
......@@ -205,11 +184,13 @@ public final class DownloadManager {
Context context, WritableDownloadIndex downloadIndex, DownloaderFactory downloaderFactory) {
this.context = context.getApplicationContext();
this.downloadIndex = downloadIndex;
maxParallelDownloads = DEFAULT_MAX_PARALLEL_DOWNLOADS;
minRetryCount = DEFAULT_MIN_RETRY_COUNT;
downloadsPaused = true;
downloads = new ArrayList<>();
downloads = Collections.emptyList();
listeners = new CopyOnWriteArraySet<>();
requirementsListener = this::onRequirementsStateChanged;
requirementsWatcher =
new RequirementsWatcher(context, requirementsListener, DEFAULT_REQUIREMENTS);
......@@ -253,8 +234,14 @@ public final class DownloadManager {
* reason that the {@link #getRequirements() Requirements} are not met.
*/
public boolean isWaitingForRequirements() {
// TODO: Fix this to return the right thing.
return !downloads.isEmpty();
if (!downloadsPaused && notMetRequirements != 0) {
for (int i = 0; i < downloads.size(); i++) {
if (downloads.get(i).state == STATE_QUEUED) {
return true;
}
}
}
return false;
}
/**
......@@ -362,7 +349,7 @@ public final class DownloadManager {
* #getDownloadIndex()} instead.
*/
public List<Download> getCurrentDownloads() {
return Collections.unmodifiableList(new ArrayList<>(downloads));
return downloads;
}
/** Returns whether downloads are currently paused. */
......@@ -475,10 +462,10 @@ public final class DownloadManager {
}
mainHandler.removeCallbacksAndMessages(/* token= */ null);
// Reset state.
downloads = Collections.emptyList();
pendingMessages = 0;
activeTaskCount = 0;
initialized = false;
downloads.clear();
}
}
......@@ -508,13 +495,9 @@ public final class DownloadManager {
List<Download> downloads = (List<Download>) message.obj;
onInitialized(downloads);
break;
case MSG_DOWNLOAD_CHANGED:
Download state = (Download) message.obj;
onDownloadChanged(state);
break;
case MSG_DOWNLOAD_REMOVED:
state = (Download) message.obj;
onDownloadRemoved(state);
case MSG_DOWNLOAD_UPDATE:
DownloadUpdate update = (DownloadUpdate) message.obj;
onDownloadUpdate(update);
break;
case MSG_PROCESSED:
int processedMessageCount = message.arg1;
......@@ -529,32 +512,23 @@ public final class DownloadManager {
private void onInitialized(List<Download> downloads) {
initialized = true;
this.downloads.addAll(downloads);
this.downloads = Collections.unmodifiableList(downloads);
for (Listener listener : listeners) {
listener.onInitialized(DownloadManager.this);
}
}
private void onDownloadChanged(Download download) {
int downloadIndex = getDownloadIndex(download.request.id);
if (download.isTerminalState()) {
if (downloadIndex != C.INDEX_UNSET) {
downloads.remove(downloadIndex);
private void onDownloadUpdate(DownloadUpdate update) {
downloads = Collections.unmodifiableList(update.downloads);
Download updatedDownload = update.download;
if (update.isRemove) {
for (Listener listener : listeners) {
listener.onDownloadRemoved(this, updatedDownload);
}
} else if (downloadIndex != C.INDEX_UNSET) {
downloads.set(downloadIndex, download);
} else {
downloads.add(download);
}
for (Listener listener : listeners) {
listener.onDownloadChanged(this, download);
}
}
private void onDownloadRemoved(Download download) {
downloads.remove(getDownloadIndex(download.request.id));
for (Listener listener : listeners) {
listener.onDownloadRemoved(this, download);
for (Listener listener : listeners) {
listener.onDownloadChanged(this, updatedDownload);
}
}
}
......@@ -568,18 +542,14 @@ public final class DownloadManager {
}
}
private int getDownloadIndex(String id) {
for (int i = 0; i < downloads.size(); i++) {
if (downloads.get(i).request.id.equals(id)) {
return i;
}
}
return C.INDEX_UNSET;
}
/* package */ static Download mergeRequest(
Download download, DownloadRequest request, int stopReason) {
Download download, DownloadRequest request, int stopReason, long nowMs) {
@Download.State int state = download.state;
// Treat the merge as creating a new download if we're currently removing the existing one, or
// if the existing download is in a terminal state. Else treat the merge as updating the
// existing download.
long startTimeMs =
state == STATE_REMOVING || download.isTerminalState() ? nowMs : download.startTimeMs;
if (state == STATE_REMOVING || state == STATE_RESTARTING) {
state = STATE_RESTARTING;
} else if (stopReason != STOP_REASON_NONE) {
......@@ -587,8 +557,6 @@ public final class DownloadManager {
} else {
state = STATE_QUEUED;
}
long nowMs = System.currentTimeMillis();
long startTimeMs = download.isTerminalState() ? nowMs : download.startTimeMs;
return new Download(
download.request.copyWithMergedRequest(request),
state,
......@@ -599,40 +567,6 @@ public final class DownloadManager {
FAILURE_REASON_NONE);
}
private static Download copyWithState(Download download, @Download.State int state) {
return new Download(
download.request,
state,
download.startTimeMs,
/* updateTimeMs= */ System.currentTimeMillis(),
download.contentLength,
download.stopReason,
FAILURE_REASON_NONE,
download.progress);
}
private static void logd(String message) {
if (DEBUG) {
Log.d(TAG, message);
}
}
private static void logd(String message, DownloadInternal downloadInternal) {
logd(message, downloadInternal.download.request);
}
private static void logd(String message, DownloadRequest request) {
if (DEBUG) {
logd(message + ": " + request);
}
}
private static void logdFlags(String message, int flags) {
if (DEBUG) {
logd(message + ": " + Integer.toBinaryString(flags));
}
}
private static final class InternalHandler extends Handler {
public boolean released;
......@@ -641,15 +575,14 @@ public final class DownloadManager {
private final WritableDownloadIndex downloadIndex;
private final DownloaderFactory downloaderFactory;
private final Handler mainHandler;
private final ArrayList<DownloadInternal> downloadInternals;
private final ArrayList<Download> downloads;
private final HashMap<String, Task> activeTasks;
// Mutable fields that are accessed on the internal thread.
@Requirements.RequirementFlags private int notMetRequirements;
private boolean downloadsPaused;
private int maxParallelDownloads;
private int minRetryCount;
private int parallelDownloads;
private int activeDownloadTaskCount;
public InternalHandler(
HandlerThread thread,
......@@ -667,7 +600,7 @@ public final class DownloadManager {
this.maxParallelDownloads = maxParallelDownloads;
this.minRetryCount = minRetryCount;
this.downloadsPaused = downloadsPaused;
downloadInternals = new ArrayList<>();
downloads = new ArrayList<>();
activeTasks = new HashMap<>();
}
......@@ -732,70 +665,91 @@ public final class DownloadManager {
private void initialize(int notMetRequirements) {
this.notMetRequirements = notMetRequirements;
ArrayList<Download> loadedStates = new ArrayList<>();
try (DownloadCursor cursor =
downloadIndex.getDownloads(
STATE_QUEUED, STATE_STOPPED, STATE_DOWNLOADING, STATE_REMOVING, STATE_RESTARTING)) {
DownloadCursor cursor = null;
try {
downloadIndex.setDownloadingStatesToQueued();
cursor =
downloadIndex.getDownloads(
STATE_QUEUED, STATE_STOPPED, STATE_DOWNLOADING, STATE_REMOVING, STATE_RESTARTING);
while (cursor.moveToNext()) {
loadedStates.add(cursor.getDownload());
downloads.add(cursor.getDownload());
}
logd("Downloads are loaded.");
} catch (Throwable e) {
Log.e(TAG, "Download state loading failed.", e);
loadedStates.clear();
}
for (Download download : loadedStates) {
addDownloadForState(download);
}
logd("Downloads are created.");
mainHandler.obtainMessage(MSG_INITIALIZED, loadedStates).sendToTarget();
for (int i = 0; i < downloadInternals.size(); i++) {
downloadInternals.get(i).start();
} catch (IOException e) {
Log.e(TAG, "Failed to load index.", e);
downloads.clear();
} finally {
Util.closeQuietly(cursor);
}
// A copy must be used for the message to ensure that subsequent changes to the downloads list
// are not visible to the main thread when it processes the message.
ArrayList<Download> downloadsForMessage = new ArrayList<>(downloads);
mainHandler.obtainMessage(MSG_INITIALIZED, downloadsForMessage).sendToTarget();
syncTasks();
}
private void setDownloadsPaused(boolean downloadsPaused) {
this.downloadsPaused = downloadsPaused;
for (int i = 0; i < downloadInternals.size(); i++) {
downloadInternals.get(i).updateStopState();
}
syncTasks();
}
private void setNotMetRequirements(@Requirements.RequirementFlags int notMetRequirements) {
this.notMetRequirements = notMetRequirements;
logdFlags("Not met requirements are changed", notMetRequirements);
for (int i = 0; i < downloadInternals.size(); i++) {
downloadInternals.get(i).updateStopState();
}
syncTasks();
}
private void setStopReason(@Nullable String id, int stopReason) {
if (id != null) {
DownloadInternal downloadInternal = getDownload(id);
if (downloadInternal != null) {
logd("download stop reason is set to : " + stopReason, downloadInternal);
downloadInternal.setStopReason(stopReason);
return;
if (id == null) {
for (int i = 0; i < downloads.size(); i++) {
setStopReason(downloads.get(i), stopReason);
}
try {
// Set the stop reason for downloads in terminal states as well.
downloadIndex.setStopReason(stopReason);
} catch (IOException e) {
Log.e(TAG, "Failed to set manual stop reason", e);
}
} else {
for (int i = 0; i < downloadInternals.size(); i++) {
downloadInternals.get(i).setStopReason(stopReason);
Download download = getDownload(id, /* loadFromIndex= */ false);
if (download != null) {
setStopReason(download, stopReason);
} else {
try {
// Set the stop reason if the download is in a terminal state.
downloadIndex.setStopReason(id, stopReason);
} catch (IOException e) {
Log.e(TAG, "Failed to set manual stop reason: " + id, e);
}
}
}
try {
if (id != null) {
downloadIndex.setStopReason(id, stopReason);
} else {
downloadIndex.setStopReason(stopReason);
syncTasks();
}
private void setStopReason(Download download, int stopReason) {
if (stopReason == STOP_REASON_NONE) {
if (download.state == STATE_STOPPED) {
putDownloadWithState(download, STATE_QUEUED);
}
} catch (IOException e) {
Log.e(TAG, "setStopReason failed", e);
} else if (stopReason != download.stopReason) {
@Download.State int state = download.state;
if (state == STATE_QUEUED || state == STATE_DOWNLOADING) {
state = STATE_STOPPED;
}
putDownload(
new Download(
download.request,
state,
download.startTimeMs,
/* updateTimeMs= */ System.currentTimeMillis(),
download.contentLength,
stopReason,
FAILURE_REASON_NONE,
download.progress));
}
}
private void setMaxParallelDownloads(int maxParallelDownloads) {
this.maxParallelDownloads = maxParallelDownloads;
// TODO: Start or stop downloads if necessary.
syncTasks();
}
private void setMinRetryCount(int minRetryCount) {
......@@ -803,77 +757,44 @@ public final class DownloadManager {
}
private void addDownload(DownloadRequest request, int stopReason) {
DownloadInternal downloadInternal = getDownload(request.id);
if (downloadInternal != null) {
downloadInternal.addRequest(request, stopReason);
logd("Request is added to existing download", downloadInternal);
Download download = getDownload(request.id, /* loadFromIndex= */ true);
long nowMs = System.currentTimeMillis();
if (download != null) {
putDownload(mergeRequest(download, request, stopReason, nowMs));
} else {
Download download = loadDownload(request.id);
if (download == null) {
long nowMs = System.currentTimeMillis();
download =
new Download(
request,
stopReason != Download.STOP_REASON_NONE ? STATE_STOPPED : STATE_QUEUED,
/* startTimeMs= */ nowMs,
/* updateTimeMs= */ nowMs,
/* contentLength= */ C.LENGTH_UNSET,
stopReason,
Download.FAILURE_REASON_NONE);
logd("Download state is created for " + request.id);
} else {
download = mergeRequest(download, request, stopReason);
logd("Download state is loaded for " + request.id);
}
addDownloadForState(download);
putDownload(
new Download(
request,
stopReason != STOP_REASON_NONE ? STATE_STOPPED : STATE_QUEUED,
/* startTimeMs= */ nowMs,
/* updateTimeMs= */ nowMs,
/* contentLength= */ C.LENGTH_UNSET,
stopReason,
FAILURE_REASON_NONE));
}
syncTasks();
}
private void removeDownload(String id) {
DownloadInternal downloadInternal = getDownload(id);
if (downloadInternal != null) {
downloadInternal.remove();
} else {
Download download = loadDownload(id);
if (download != null) {
addDownloadForState(copyWithState(download, STATE_REMOVING));
} else {
logd("Can't remove download. No download with id: " + id);
}
}
}
private void onTaskStopped(Task task) {
logd("Task is stopped", task.request);
String downloadId = task.request.id;
activeTasks.remove(downloadId);
boolean tryToStartDownloads = false;
if (!task.isRemove) {
// If maxParallelDownloads was hit, there might be a download waiting for a slot.
tryToStartDownloads = parallelDownloads == maxParallelDownloads;
parallelDownloads--;
}
getDownload(downloadId).onTaskStopped(task.isCanceled, task.finalError);
if (tryToStartDownloads) {
for (int i = 0;
parallelDownloads < maxParallelDownloads && i < downloadInternals.size();
i++) {
downloadInternals.get(i).start();
}
Download download = getDownload(id, /* loadFromIndex= */ true);
if (download == null) {
Log.e(TAG, "Failed to remove nonexistent download: " + id);
return;
}
}
private void onContentLengthChanged(Task task) {
String downloadId = task.request.id;
getDownload(downloadId).setContentLength(task.contentLength);
putDownloadWithState(download, STATE_REMOVING);
syncTasks();
}
private void release() {
for (Task task : activeTasks.values()) {
task.cancel(/* released= */ true);
}
activeTasks.clear();
downloadInternals.clear();
try {
downloadIndex.setDownloadingStatesToQueued();
} catch (IOException e) {
Log.e(TAG, "Failed to update index.", e);
}
downloads.clear();
thread.quit();
synchronized (this) {
released = true;
......@@ -881,261 +802,293 @@ public final class DownloadManager {
}
}
private void onDownloadChanged(DownloadInternal downloadInternal, Download download) {
logd("Download state is changed", downloadInternal);
try {
downloadIndex.putDownload(download);
} catch (IOException e) {
Log.e(TAG, "Failed to update index", e);
}
if (downloadInternal.state == STATE_COMPLETED || downloadInternal.state == STATE_FAILED) {
downloadInternals.remove(downloadInternal);
// Start and cancel tasks based on the current download and manager states.
private void syncTasks() {
int accumulatingDownloadTaskCount = 0;
for (int i = 0; i < downloads.size(); i++) {
Download download = downloads.get(i);
Task activeTask = activeTasks.get(download.request.id);
switch (download.state) {
case STATE_STOPPED:
syncStoppedDownload(activeTask);
break;
case STATE_QUEUED:
activeTask = syncQueuedDownload(activeTask, download);
break;
case STATE_DOWNLOADING:
activeTask = Assertions.checkNotNull(activeTask);
syncDownloadingDownload(activeTask, download, accumulatingDownloadTaskCount);
break;
case STATE_REMOVING:
case STATE_RESTARTING:
syncRemovingDownload(activeTask, download);
break;
case STATE_COMPLETED:
case STATE_FAILED:
default:
throw new IllegalStateException();
}
if (activeTask != null && !activeTask.isRemove) {
accumulatingDownloadTaskCount++;
}
}
mainHandler.obtainMessage(MSG_DOWNLOAD_CHANGED, download).sendToTarget();
}
private void onDownloadRemoved(DownloadInternal downloadInternal, Download download) {
logd("Download is removed", downloadInternal);
try {
downloadIndex.removeDownload(download.request.id);
} catch (IOException e) {
Log.e(TAG, "Failed to remove from index", e);
private void syncStoppedDownload(@Nullable Task activeTask) {
if (activeTask != null) {
// We have a task, which must be a download task. Cancel it.
Assertions.checkState(!activeTask.isRemove);
activeTask.cancel(/* released= */ false);
}
downloadInternals.remove(downloadInternal);
mainHandler.obtainMessage(MSG_DOWNLOAD_REMOVED, download).sendToTarget();
}
@StartThreadResults
private int startTask(DownloadInternal downloadInternal) {
DownloadRequest request = downloadInternal.download.request;
String downloadId = request.id;
if (activeTasks.containsKey(downloadId)) {
if (stopDownloadTask(downloadId)) {
return START_THREAD_WAIT_DOWNLOAD_CANCELLATION;
}
return START_THREAD_WAIT_REMOVAL_TO_FINISH;
private Task syncQueuedDownload(@Nullable Task activeTask, Download download) {
if (activeTask != null) {
// We have a task, which must be a download task. If the download state is queued we need to
// cancel it and start a new one, since a new request has been merged into the download.
Assertions.checkState(!activeTask.isRemove);
activeTask.cancel(/* released= */ false);
return activeTask;
}
boolean isRemove = downloadInternal.isInRemoveState();
if (!isRemove) {
if (parallelDownloads == maxParallelDownloads) {
return START_THREAD_TOO_MANY_DOWNLOADS;
}
parallelDownloads++;
if (!canDownloadsRun() || activeDownloadTaskCount >= maxParallelDownloads) {
return null;
}
Downloader downloader = downloaderFactory.createDownloader(request);
DownloadProgress downloadProgress = downloadInternal.download.progress;
Task task =
// We can start a download task.
download = putDownloadWithState(download, STATE_DOWNLOADING);
Downloader downloader = downloaderFactory.createDownloader(download.request);
activeTask =
new Task(
request,
download.request,
downloader,
downloadProgress,
isRemove,
download.progress,
/* isRemove= */ false,
minRetryCount,
/* internalHandler= */ this);
activeTasks.put(downloadId, task);
task.start();
logd("Task is started", downloadInternal);
return START_THREAD_SUCCEEDED;
activeTasks.put(download.request.id, activeTask);
activeDownloadTaskCount++;
activeTask.start();
return activeTask;
}
private boolean stopDownloadTask(String downloadId) {
Task task = activeTasks.get(downloadId);
if (task != null && !task.isRemove) {
task.cancel(/* released= */ false);
logd("Task is cancelled", task.request);
return true;
private void syncDownloadingDownload(
Task activeTask, Download download, int accumulatingDownloadTaskCount) {
Assertions.checkState(!activeTask.isRemove);
if (!canDownloadsRun() || accumulatingDownloadTaskCount >= maxParallelDownloads) {
putDownloadWithState(download, STATE_QUEUED);
activeTask.cancel(/* released= */ false);
}
return false;
}
@Nullable
private DownloadInternal getDownload(String id) {
for (int i = 0; i < downloadInternals.size(); i++) {
DownloadInternal downloadInternal = downloadInternals.get(i);
if (downloadInternal.download.request.id.equals(id)) {
return downloadInternal;
private void syncRemovingDownload(@Nullable Task activeTask, Download download) {
if (activeTask != null) {
if (!activeTask.isRemove) {
// Cancel the downloading task.
activeTask.cancel(/* released= */ false);
}
// The activeTask is either a remove task, or a downloading task that we just cancelled. In
// the latter case we need to wait for the task to stop before we start a remove task.
return;
}
return null;
}
private Download loadDownload(String id) {
try {
return downloadIndex.getDownload(id);
} catch (IOException e) {
Log.e(TAG, "loadDownload failed", e);
}
return null;
// We can start a remove task.
Downloader downloader = downloaderFactory.createDownloader(download.request);
activeTask =
new Task(
download.request,
downloader,
download.progress,
/* isRemove= */ true,
minRetryCount,
/* internalHandler= */ this);
activeTasks.put(download.request.id, activeTask);
activeTask.start();
}
private void addDownloadForState(Download download) {
DownloadInternal downloadInternal = new DownloadInternal(this, download);
downloadInternals.add(downloadInternal);
logd("Download is added", downloadInternal);
downloadInternal.initialize();
}
// Task event processing.
private boolean canStartDownloads() {
return !downloadsPaused && notMetRequirements == 0;
private void onContentLengthChanged(Task task) {
String downloadId = task.request.id;
long contentLength = task.contentLength;
Download download = getDownload(downloadId, /* loadFromIndex= */ false);
if (contentLength == download.contentLength || contentLength == C.LENGTH_UNSET) {
return;
}
putDownload(
new Download(
download.request,
download.state,
download.startTimeMs,
/* updateTimeMs= */ System.currentTimeMillis(),
contentLength,
download.stopReason,
download.failureReason,
download.progress));
}
}
private static final class DownloadInternal {
private final InternalHandler internalHandler;
private Download download;
private void onTaskStopped(Task task) {
String downloadId = task.request.id;
activeTasks.remove(downloadId);
// TODO: Get rid of these and use download directly.
@Download.State private int state;
private long contentLength;
private int stopReason;
@MonotonicNonNull @Download.FailureReason private int failureReason;
boolean isRemove = task.isRemove;
if (!isRemove) {
activeDownloadTaskCount--;
}
private DownloadInternal(InternalHandler internalHandler, Download download) {
this.internalHandler = internalHandler;
this.download = download;
state = download.state;
contentLength = download.contentLength;
stopReason = download.stopReason;
failureReason = download.failureReason;
}
if (task.isCanceled) {
syncTasks();
return;
}
private void initialize() {
initialize(download.state);
}
Throwable finalError = task.finalError;
if (finalError != null) {
Log.e(TAG, "Task failed: " + task.request + ", " + isRemove, finalError);
}
public void addRequest(DownloadRequest newRequest, int stopReason) {
download = mergeRequest(download, newRequest, stopReason);
initialize();
}
Download download =
Assertions.checkNotNull(getDownload(downloadId, /* loadFromIndex= */ false));
switch (download.state) {
case STATE_DOWNLOADING:
Assertions.checkState(!isRemove);
onDownloadTaskStopped(download, finalError);
break;
case STATE_REMOVING:
case STATE_RESTARTING:
Assertions.checkState(isRemove);
onRemoveTaskStopped(download);
break;
case STATE_QUEUED:
case STATE_STOPPED:
case STATE_COMPLETED:
case STATE_FAILED:
default:
throw new IllegalStateException();
}
public void remove() {
initialize(STATE_REMOVING);
syncTasks();
}
public Download getUpdatedDownload() {
private void onDownloadTaskStopped(Download download, @Nullable Throwable finalError) {
download =
new Download(
download.request,
state,
finalError == null ? STATE_COMPLETED : STATE_FAILED,
download.startTimeMs,
/* updateTimeMs= */ System.currentTimeMillis(),
contentLength,
stopReason,
state != STATE_FAILED ? FAILURE_REASON_NONE : failureReason,
download.contentLength,
download.stopReason,
finalError == null ? FAILURE_REASON_NONE : FAILURE_REASON_UNKNOWN,
download.progress);
return download;
}
public boolean isIdle() {
return state != STATE_DOWNLOADING && state != STATE_REMOVING && state != STATE_RESTARTING;
}
@Override
public String toString() {
return download.request.id + ' ' + Download.getStateString(state);
// The download is now in a terminal state, so should not be in the downloads list.
downloads.remove(getDownloadIndex(download.request.id));
// We still need to update the download index and main thread.
try {
downloadIndex.putDownload(download);
} catch (IOException e) {
Log.e(TAG, "Failed to update index.", e);
}
DownloadUpdate update =
new DownloadUpdate(download, /* isRemove= */ false, new ArrayList<>(downloads));
mainHandler.obtainMessage(MSG_DOWNLOAD_UPDATE, update).sendToTarget();
}
public void start() {
if (state == STATE_QUEUED || state == STATE_DOWNLOADING) {
startOrQueue();
} else if (isInRemoveState()) {
internalHandler.startTask(this);
private void onRemoveTaskStopped(Download download) {
if (download.state == STATE_RESTARTING) {
putDownloadWithState(
download, download.stopReason == STOP_REASON_NONE ? STATE_QUEUED : STATE_STOPPED);
syncTasks();
} else {
int removeIndex = getDownloadIndex(download.request.id);
downloads.remove(removeIndex);
try {
downloadIndex.removeDownload(download.request.id);
} catch (IOException e) {
Log.e(TAG, "Failed to remove from database");
}
DownloadUpdate update =
new DownloadUpdate(download, /* isRemove= */ true, new ArrayList<>(downloads));
mainHandler.obtainMessage(MSG_DOWNLOAD_UPDATE, update).sendToTarget();
}
}
public void setStopReason(int stopReason) {
this.stopReason = stopReason;
updateStopState();
}
// Helper methods.
public boolean isInRemoveState() {
return state == STATE_REMOVING || state == STATE_RESTARTING;
private boolean canDownloadsRun() {
return !downloadsPaused && notMetRequirements == 0;
}
public void setContentLength(long contentLength) {
if (this.contentLength == contentLength) {
return;
}
this.contentLength = contentLength;
internalHandler.onDownloadChanged(this, getUpdatedDownload());
private Download putDownloadWithState(Download download, @Download.State int state) {
// Downloads in terminal states shouldn't be in the downloads list. This method cannot be used
// to set STATE_STOPPED either, because it doesn't have a stopReason argument.
Assertions.checkState(
state != STATE_COMPLETED && state != STATE_FAILED && state != STATE_STOPPED);
return putDownload(
new Download(
download.request,
state,
download.startTimeMs,
/* updateTimeMs= */ System.currentTimeMillis(),
download.contentLength,
/* stopReason= */ 0,
FAILURE_REASON_NONE,
download.progress));
}
private void updateStopState() {
Download oldDownload = download;
if (canStart()) {
if (state == STATE_STOPPED) {
startOrQueue();
}
private Download putDownload(Download download) {
// Downloads in terminal states shouldn't be in the downloads list.
Assertions.checkState(download.state != STATE_COMPLETED && download.state != STATE_FAILED);
int changedIndex = getDownloadIndex(download.request.id);
if (changedIndex == C.INDEX_UNSET) {
downloads.add(download);
Collections.sort(downloads, InternalHandler::compareStartTimes);
} else {
if (state == STATE_DOWNLOADING || state == STATE_QUEUED) {
internalHandler.stopDownloadTask(download.request.id);
setState(STATE_STOPPED);
boolean needsSort = download.startTimeMs != downloads.get(changedIndex).startTimeMs;
downloads.set(changedIndex, download);
if (needsSort) {
Collections.sort(downloads, InternalHandler::compareStartTimes);
}
}
if (oldDownload == download) {
internalHandler.onDownloadChanged(this, getUpdatedDownload());
try {
downloadIndex.putDownload(download);
} catch (IOException e) {
Log.e(TAG, "Failed to update index.", e);
}
DownloadUpdate update =
new DownloadUpdate(download, /* isRemove= */ false, new ArrayList<>(downloads));
mainHandler.obtainMessage(MSG_DOWNLOAD_UPDATE, update).sendToTarget();
return download;
}
private void initialize(int initialState) {
// Don't notify listeners with initial state until we make sure we don't switch to another
// state immediately.
state = initialState;
if (isInRemoveState()) {
internalHandler.startTask(this);
} else if (canStart()) {
startOrQueue();
} else {
setState(STATE_STOPPED);
}
if (state == initialState) {
internalHandler.onDownloadChanged(this, getUpdatedDownload());
@Nullable
private Download getDownload(String id, boolean loadFromIndex) {
int index = getDownloadIndex(id);
if (index != C.INDEX_UNSET) {
return downloads.get(index);
}
}
private boolean canStart() {
return internalHandler.canStartDownloads() && stopReason == STOP_REASON_NONE;
}
private void startOrQueue() {
Assertions.checkState(!isInRemoveState());
@StartThreadResults int result = internalHandler.startTask(this);
Assertions.checkState(result != START_THREAD_WAIT_REMOVAL_TO_FINISH);
if (result == START_THREAD_SUCCEEDED || result == START_THREAD_WAIT_DOWNLOAD_CANCELLATION) {
setState(STATE_DOWNLOADING);
} else {
setState(STATE_QUEUED);
if (loadFromIndex) {
try {
return downloadIndex.getDownload(id);
} catch (IOException e) {
Log.e(TAG, "Failed to load download: " + id, e);
}
}
return null;
}
private void setState(@Download.State int newState) {
if (state != newState) {
state = newState;
internalHandler.onDownloadChanged(this, getUpdatedDownload());
private int getDownloadIndex(String id) {
for (int i = 0; i < downloads.size(); i++) {
Download download = downloads.get(i);
if (download.request.id.equals(id)) {
return i;
}
}
return C.INDEX_UNSET;
}
private void onTaskStopped(boolean isCanceled, @Nullable Throwable error) {
if (isIdle()) {
return;
}
if (isCanceled) {
internalHandler.startTask(this);
} else if (state == STATE_REMOVING) {
internalHandler.onDownloadRemoved(this, getUpdatedDownload());
} else if (state == STATE_RESTARTING) {
initialize(STATE_QUEUED);
} else { // STATE_DOWNLOADING
if (error != null) {
Log.e(TAG, "Download failed: " + download.request.id, error);
failureReason = FAILURE_REASON_UNKNOWN;
setState(STATE_FAILED);
} else {
setState(STATE_COMPLETED);
}
}
private static int compareStartTimes(Download first, Download second) {
return Util.compareLong(first.startTimeMs, second.startTimeMs);
}
}
......@@ -1177,16 +1130,17 @@ public final class DownloadManager {
// download manager whilst cancellation is ongoing.
internalHandler = null;
}
isCanceled = true;
downloader.cancel();
interrupt();
if (!isCanceled) {
isCanceled = true;
downloader.cancel();
interrupt();
}
}
// Methods running on download thread.
@Override
public void run() {
logd("Download started", request);
try {
if (isRemove) {
downloader.remove();
......@@ -1201,14 +1155,12 @@ public final class DownloadManager {
if (!isCanceled) {
long bytesDownloaded = downloadProgress.bytesDownloaded;
if (bytesDownloaded != errorPosition) {
logd("Reset error count. bytesDownloaded = " + bytesDownloaded, request);
errorPosition = bytesDownloaded;
errorCount = 0;
}
if (++errorCount > minRetryCount) {
throw e;
}
logd("Download error. Retry " + errorCount, request);
Thread.sleep(getRetryDelayMillis(errorCount));
}
}
......@@ -1240,4 +1192,18 @@ public final class DownloadManager {
return Math.min((errorCount - 1) * 1000, 5000);
}
}
private static final class DownloadUpdate {
private final Download download;
private final boolean isRemove;
private final List<Download> downloads;
public DownloadUpdate(Download download, boolean isRemove, List<Download> downloads) {
this.download = download;
this.isRemove = isRemove;
this.downloads = downloads;
}
}
}
......@@ -38,6 +38,13 @@ public interface WritableDownloadIndex extends DownloadIndex {
void removeDownload(String id) throws IOException;
/**
* Sets all {@link Download#STATE_DOWNLOADING} states to {@link Download#STATE_QUEUED}.
*
* @throws IOException If an error occurs updating the state.
*/
void setDownloadingStatesToQueued() throws IOException;
/**
* Sets the stop reason of the downloads in a terminal state ({@link Download#STATE_COMPLETED},
* {@link Download#STATE_FAILED}).
*
......
......@@ -38,6 +38,8 @@ import org.junit.runner.RunWith;
@RunWith(AndroidJUnit4.class)
public class ActionFileUpgradeUtilTest {
private static final long NOW_MS = 1234;
private File tempFile;
private ExoDatabaseProvider databaseProvider;
private DefaultDownloadIndex downloadIndex;
......@@ -113,7 +115,7 @@ public class ActionFileUpgradeUtilTest {
data);
ActionFileUpgradeUtil.mergeRequest(
request, downloadIndex, /* addNewDownloadAsCompleted= */ false);
request, downloadIndex, /* addNewDownloadAsCompleted= */ false, NOW_MS);
assertDownloadIndexContainsRequest(request, Download.STATE_QUEUED);
}
......@@ -141,9 +143,9 @@ public class ActionFileUpgradeUtilTest {
/* customCacheKey= */ "key123",
new byte[] {5, 4, 3, 2, 1});
ActionFileUpgradeUtil.mergeRequest(
request1, downloadIndex, /* addNewDownloadAsCompleted= */ false);
request1, downloadIndex, /* addNewDownloadAsCompleted= */ false, NOW_MS);
ActionFileUpgradeUtil.mergeRequest(
request2, downloadIndex, /* addNewDownloadAsCompleted= */ false);
request2, downloadIndex, /* addNewDownloadAsCompleted= */ false, NOW_MS);
Download download = downloadIndex.getDownload(request2.id);
assertThat(download).isNotNull();
......@@ -178,16 +180,16 @@ public class ActionFileUpgradeUtilTest {
/* customCacheKey= */ "key123",
new byte[] {5, 4, 3, 2, 1});
ActionFileUpgradeUtil.mergeRequest(
request1, downloadIndex, /* addNewDownloadAsCompleted= */ false);
request1, downloadIndex, /* addNewDownloadAsCompleted= */ false, NOW_MS);
// Merging existing download, keeps it queued.
ActionFileUpgradeUtil.mergeRequest(
request1, downloadIndex, /* addNewDownloadAsCompleted= */ true);
request1, downloadIndex, /* addNewDownloadAsCompleted= */ true, NOW_MS);
assertThat(downloadIndex.getDownload(request1.id).state).isEqualTo(Download.STATE_QUEUED);
// New download is merged as completed.
ActionFileUpgradeUtil.mergeRequest(
request2, downloadIndex, /* addNewDownloadAsCompleted= */ true);
request2, downloadIndex, /* addNewDownloadAsCompleted= */ true, NOW_MS);
assertThat(downloadIndex.getDownload(request2.id).state).isEqualTo(Download.STATE_COMPLETED);
}
......
......@@ -61,6 +61,8 @@ public class DownloadManagerTest {
private static final int APP_STOP_REASON = 1;
/** The minimum number of times a task must be retried before failing. */
private static final int MIN_RETRY_COUNT = 3;
/** Dummy value for the current time. */
private static final long NOW_MS = 1234;
private Uri uri1;
private Uri uri2;
......@@ -132,6 +134,7 @@ public class DownloadManagerTest {
task.assertCompleted();
runner.assertCreatedDownloaderCount(1);
downloadManagerListener.blockUntilTasksCompleteAndThrowAnyDownloadError();
assertThat(downloadManager.getCurrentDownloads()).isEmpty();
}
@Test
......@@ -143,6 +146,7 @@ public class DownloadManagerTest {
task.assertRemoved();
runner.assertCreatedDownloaderCount(2);
downloadManagerListener.blockUntilTasksCompleteAndThrowAnyDownloadError();
assertThat(downloadManager.getCurrentDownloads()).isEmpty();
}
@Test
......@@ -158,6 +162,7 @@ public class DownloadManagerTest {
downloader.assertReleased().assertStartCount(MIN_RETRY_COUNT + 1);
runner.getTask().assertFailed();
downloadManagerListener.blockUntilTasksComplete();
assertThat(downloadManager.getCurrentDownloads()).isEmpty();
}
@Test
......@@ -174,6 +179,7 @@ public class DownloadManagerTest {
downloader.assertReleased().assertStartCount(MIN_RETRY_COUNT + 1);
runner.getTask().assertCompleted();
downloadManagerListener.blockUntilTasksComplete();
assertThat(downloadManager.getCurrentDownloads()).isEmpty();
}
@Test
......@@ -341,7 +347,7 @@ public class DownloadManagerTest {
}
@Test
public void getTasks_returnTasks() {
public void getCurrentDownloads_returnsCurrentDownloads() {
TaskWrapper task1 = new DownloadRunner(uri1).postDownloadRequest().getTask();
TaskWrapper task2 = new DownloadRunner(uri2).postDownloadRequest().getTask();
TaskWrapper task3 =
......@@ -370,13 +376,11 @@ public class DownloadManagerTest {
runOnMainThread(() -> downloadManager.pauseDownloads());
// TODO: This should be assertQueued. Fix implementation and update test.
runner1.getTask().assertStopped();
runner1.getTask().assertQueued();
// remove requests aren't stopped.
runner2.getDownloader(1).unblock().assertReleased();
// TODO: This should be assertQueued. Fix implementation and update test.
runner2.getTask().assertStopped();
runner2.getTask().assertQueued();
// Although remove2 is finished, download2 doesn't start.
runner2.getDownloader(2).assertDoesNotStart();
......@@ -397,7 +401,7 @@ public class DownloadManagerTest {
}
@Test
public void manuallyStopAndResumeSingleDownload() throws Throwable {
public void setAndClearSingleDownloadStopReason() throws Throwable {
DownloadRunner runner = new DownloadRunner(uri1).postDownloadRequest();
TaskWrapper task = runner.getTask();
......@@ -415,7 +419,7 @@ public class DownloadManagerTest {
}
@Test
public void manuallyStoppedDownloadCanBeCancelled() throws Throwable {
public void setSingleDownloadStopReasonThenRemove_removesDownload() throws Throwable {
DownloadRunner runner = new DownloadRunner(uri1).postDownloadRequest();
TaskWrapper task = runner.getTask();
......@@ -433,7 +437,7 @@ public class DownloadManagerTest {
}
@Test
public void manuallyStoppedSingleDownload_doesNotAffectOthers() throws Throwable {
public void setSingleDownloadStopReason_doesNotAffectOtherDownloads() throws Throwable {
DownloadRunner runner1 = new DownloadRunner(uri1);
DownloadRunner runner2 = new DownloadRunner(uri2);
DownloadRunner runner3 = new DownloadRunner(uri3);
......@@ -455,21 +459,22 @@ public class DownloadManagerTest {
}
@Test
public void mergeRequest_removingDownload_becomesRestarting() {
public void mergeRequest_removing_becomesRestarting() {
DownloadRequest downloadRequest = createDownloadRequest();
DownloadBuilder downloadBuilder =
new DownloadBuilder(downloadRequest).setState(Download.STATE_REMOVING);
Download download = downloadBuilder.build();
Download mergedDownload =
DownloadManager.mergeRequest(download, downloadRequest, download.stopReason);
DownloadManager.mergeRequest(download, downloadRequest, download.stopReason, NOW_MS);
Download expectedDownload = downloadBuilder.setState(Download.STATE_RESTARTING).build();
assertEqualIgnoringTimeFields(mergedDownload, expectedDownload);
Download expectedDownload =
downloadBuilder.setStartTimeMs(NOW_MS).setState(Download.STATE_RESTARTING).build();
assertEqualIgnoringUpdateTime(mergedDownload, expectedDownload);
}
@Test
public void mergeRequest_failedDownload_becomesQueued() {
public void mergeRequest_failed_becomesQueued() {
DownloadRequest downloadRequest = createDownloadRequest();
DownloadBuilder downloadBuilder =
new DownloadBuilder(downloadRequest)
......@@ -478,18 +483,19 @@ public class DownloadManagerTest {
Download download = downloadBuilder.build();
Download mergedDownload =
DownloadManager.mergeRequest(download, downloadRequest, download.stopReason);
DownloadManager.mergeRequest(download, downloadRequest, download.stopReason, NOW_MS);
Download expectedDownload =
downloadBuilder
.setStartTimeMs(NOW_MS)
.setState(Download.STATE_QUEUED)
.setFailureReason(Download.FAILURE_REASON_NONE)
.build();
assertEqualIgnoringTimeFields(mergedDownload, expectedDownload);
assertEqualIgnoringUpdateTime(mergedDownload, expectedDownload);
}
@Test
public void mergeRequest_stoppedDownload_staysStopped() {
public void mergeRequest_stopped_staysStopped() {
DownloadRequest downloadRequest = createDownloadRequest();
DownloadBuilder downloadBuilder =
new DownloadBuilder(downloadRequest)
......@@ -498,13 +504,13 @@ public class DownloadManagerTest {
Download download = downloadBuilder.build();
Download mergedDownload =
DownloadManager.mergeRequest(download, downloadRequest, download.stopReason);
DownloadManager.mergeRequest(download, downloadRequest, download.stopReason, NOW_MS);
assertEqualIgnoringTimeFields(mergedDownload, download);
assertEqualIgnoringUpdateTime(mergedDownload, download);
}
@Test
public void mergeRequest_stopReasonSetButNotStopped_becomesStopped() {
public void mergeRequest_completedWithStopReason_becomesStopped() {
DownloadRequest downloadRequest = createDownloadRequest();
DownloadBuilder downloadBuilder =
new DownloadBuilder(downloadRequest)
......@@ -513,10 +519,11 @@ public class DownloadManagerTest {
Download download = downloadBuilder.build();
Download mergedDownload =
DownloadManager.mergeRequest(download, downloadRequest, download.stopReason);
DownloadManager.mergeRequest(download, downloadRequest, download.stopReason, NOW_MS);
Download expectedDownload = downloadBuilder.setState(Download.STATE_STOPPED).build();
assertEqualIgnoringTimeFields(mergedDownload, expectedDownload);
Download expectedDownload =
downloadBuilder.setStartTimeMs(NOW_MS).setState(Download.STATE_STOPPED).build();
assertEqualIgnoringUpdateTime(mergedDownload, expectedDownload);
}
private void setUpDownloadManager(final int maxParallelDownloads) throws Exception {
......@@ -554,9 +561,10 @@ public class DownloadManagerTest {
dummyMainThread.runTestOnMainThread(r);
}
private static void assertEqualIgnoringTimeFields(Download download, Download that) {
private static void assertEqualIgnoringUpdateTime(Download download, Download that) {
assertThat(download.request).isEqualTo(that.request);
assertThat(download.state).isEqualTo(that.state);
assertThat(download.startTimeMs).isEqualTo(that.startTimeMs);
assertThat(download.contentLength).isEqualTo(that.contentLength);
assertThat(download.failureReason).isEqualTo(that.failureReason);
assertThat(download.stopReason).isEqualTo(that.stopReason);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment