Commit 907b9bf4 by olly Committed by Oliver Woodman

Sanitize threading in CronetDataSource

- Move nearly all logic onto the calling thread (i.e. the thread
  calling open/read/close), to make threading correctness more
  obvious.
- Document which variables are read/written from which thread, and
  why the call sequences are safe.
- Fix thread safety issue that I think could probably cause data
  corruption in the case of a read timeout followed by another
  request into the DataSource.

Also:

- Relaxed content length checking to be consistent with the other
  http DataSource implementations, and avoided parsing the headers
  where they're not used.
- Fixed missing generics in CronetDataSourceFactory.
- Added TODO to work with servers that don't support partial range
  requests.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=135702217
parent 4fab4022
...@@ -174,10 +174,7 @@ public final class CronetDataSourceTest { ...@@ -174,10 +174,7 @@ public final class CronetDataSourceTest {
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testOpeningTwiceThrows() throws HttpDataSourceException { public void testOpeningTwiceThrows() throws HttpDataSourceException {
mockResponseStartSuccess(); mockResponseStartSuccess();
assertConnectionState(CronetDataSource.IDLE_CONNECTION);
dataSourceUnderTest.open(testDataSpec); dataSourceUnderTest.open(testDataSpec);
assertConnectionState(CronetDataSource.OPEN_CONNECTION);
dataSourceUnderTest.open(testDataSpec); dataSourceUnderTest.open(testDataSpec);
} }
...@@ -205,7 +202,7 @@ public final class CronetDataSourceTest { ...@@ -205,7 +202,7 @@ public final class CronetDataSourceTest {
dataSourceUnderTest.onFailed( dataSourceUnderTest.onFailed(
mockUrlRequest, mockUrlRequest,
testUrlResponseInfo, testUrlResponseInfo,
null); mockUrlRequestException);
dataSourceUnderTest.onResponseStarted( dataSourceUnderTest.onResponseStarted(
mockUrlRequest2, mockUrlRequest2,
testUrlResponseInfo); testUrlResponseInfo);
...@@ -253,13 +250,10 @@ public final class CronetDataSourceTest { ...@@ -253,13 +250,10 @@ public final class CronetDataSourceTest {
@Test @Test
public void testRequestOpen() throws HttpDataSourceException { public void testRequestOpen() throws HttpDataSourceException {
mockResponseStartSuccess(); mockResponseStartSuccess();
assertEquals(TEST_CONTENT_LENGTH, dataSourceUnderTest.open(testDataSpec)); assertEquals(TEST_CONTENT_LENGTH, dataSourceUnderTest.open(testDataSpec));
assertConnectionState(CronetDataSource.OPEN_CONNECTION);
verify(mockTransferListener).onTransferStart(dataSourceUnderTest, testDataSpec); verify(mockTransferListener).onTransferStart(dataSourceUnderTest, testDataSpec);
} }
@Test @Test
public void testRequestOpenGzippedCompressedReturnsDataSpecLength() public void testRequestOpenGzippedCompressedReturnsDataSpecLength()
throws HttpDataSourceException { throws HttpDataSourceException {
...@@ -271,7 +265,6 @@ public final class CronetDataSourceTest { ...@@ -271,7 +265,6 @@ public final class CronetDataSourceTest {
testDataSpec = new DataSpec(Uri.parse(TEST_URL), 1000, 5000, null); testDataSpec = new DataSpec(Uri.parse(TEST_URL), 1000, 5000, null);
assertEquals(5000 /* contentLength */, dataSourceUnderTest.open(testDataSpec)); assertEquals(5000 /* contentLength */, dataSourceUnderTest.open(testDataSpec));
assertConnectionState(CronetDataSource.OPEN_CONNECTION);
verify(mockTransferListener).onTransferStart(dataSourceUnderTest, testDataSpec); verify(mockTransferListener).onTransferStart(dataSourceUnderTest, testDataSpec);
} }
...@@ -286,7 +279,6 @@ public final class CronetDataSourceTest { ...@@ -286,7 +279,6 @@ public final class CronetDataSourceTest {
// Check for connection not automatically closed. // Check for connection not automatically closed.
assertFalse(e.getCause() instanceof UnknownHostException); assertFalse(e.getCause() instanceof UnknownHostException);
verify(mockUrlRequest, never()).cancel(); verify(mockUrlRequest, never()).cancel();
assertConnectionState(CronetDataSource.OPENING_CONNECTION);
verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec); verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec);
} }
} }
...@@ -304,7 +296,6 @@ public final class CronetDataSourceTest { ...@@ -304,7 +296,6 @@ public final class CronetDataSourceTest {
// Check for connection not automatically closed. // Check for connection not automatically closed.
assertTrue(e.getCause() instanceof UnknownHostException); assertTrue(e.getCause() instanceof UnknownHostException);
verify(mockUrlRequest, never()).cancel(); verify(mockUrlRequest, never()).cancel();
assertConnectionState(CronetDataSource.OPENING_CONNECTION);
verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec); verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec);
} }
} }
...@@ -321,7 +312,6 @@ public final class CronetDataSourceTest { ...@@ -321,7 +312,6 @@ public final class CronetDataSourceTest {
assertTrue(e instanceof HttpDataSource.InvalidResponseCodeException); assertTrue(e instanceof HttpDataSource.InvalidResponseCodeException);
// Check for connection not automatically closed. // Check for connection not automatically closed.
verify(mockUrlRequest, never()).cancel(); verify(mockUrlRequest, never()).cancel();
assertConnectionState(CronetDataSource.OPENING_CONNECTION);
verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec); verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec);
} }
} }
...@@ -338,37 +328,16 @@ public final class CronetDataSourceTest { ...@@ -338,37 +328,16 @@ public final class CronetDataSourceTest {
assertTrue(e instanceof HttpDataSource.InvalidContentTypeException); assertTrue(e instanceof HttpDataSource.InvalidContentTypeException);
// Check for connection not automatically closed. // Check for connection not automatically closed.
verify(mockUrlRequest, never()).cancel(); verify(mockUrlRequest, never()).cancel();
assertConnectionState(CronetDataSource.OPENING_CONNECTION);
verify(mockContentTypePredicate).evaluate(TEST_CONTENT_TYPE); verify(mockContentTypePredicate).evaluate(TEST_CONTENT_TYPE);
} }
} }
@Test @Test
public void testRequestOpenValidatesContentLength() {
mockResponseStartSuccess();
// Data spec's requested length, 5000. Test response's length, 16,000.
testDataSpec = new DataSpec(Uri.parse(TEST_URL), 1000, 5000, null);
try {
dataSourceUnderTest.open(testDataSpec);
fail("HttpDataSource.HttpDataSourceException expected");
} catch (HttpDataSourceException e) {
verify(mockUrlRequest).addHeader("Range", "bytes=1000-5999");
// Check for connection not automatically closed.
verify(mockUrlRequest, never()).cancel();
assertConnectionState(CronetDataSource.OPENING_CONNECTION);
verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testPostDataSpec);
}
}
@Test
public void testPostRequestOpen() throws HttpDataSourceException { public void testPostRequestOpen() throws HttpDataSourceException {
mockResponseStartSuccess(); mockResponseStartSuccess();
dataSourceUnderTest.setRequestProperty("Content-Type", TEST_CONTENT_TYPE); dataSourceUnderTest.setRequestProperty("Content-Type", TEST_CONTENT_TYPE);
assertEquals(TEST_CONTENT_LENGTH, dataSourceUnderTest.open(testPostDataSpec)); assertEquals(TEST_CONTENT_LENGTH, dataSourceUnderTest.open(testPostDataSpec));
assertConnectionState(CronetDataSource.OPEN_CONNECTION);
verify(mockTransferListener).onTransferStart(dataSourceUnderTest, testPostDataSpec); verify(mockTransferListener).onTransferStart(dataSourceUnderTest, testPostDataSpec);
} }
...@@ -510,7 +479,6 @@ public final class CronetDataSourceTest { ...@@ -510,7 +479,6 @@ public final class CronetDataSourceTest {
dataSourceUnderTest.close(); dataSourceUnderTest.close();
verify(mockTransferListener).onTransferEnd(dataSourceUnderTest); verify(mockTransferListener).onTransferEnd(dataSourceUnderTest);
assertConnectionState(CronetDataSource.IDLE_CONNECTION);
try { try {
bytesRead += dataSourceUnderTest.read(returnedBuffer, 0, 8); bytesRead += dataSourceUnderTest.read(returnedBuffer, 0, 8);
...@@ -572,7 +540,6 @@ public final class CronetDataSourceTest { ...@@ -572,7 +540,6 @@ public final class CronetDataSourceTest {
verify(mockUrlRequest, times(1)).read(any(ByteBuffer.class)); verify(mockUrlRequest, times(1)).read(any(ByteBuffer.class));
// Check for connection not automatically closed. // Check for connection not automatically closed.
verify(mockUrlRequest, never()).cancel(); verify(mockUrlRequest, never()).cancel();
assertConnectionState(CronetDataSource.OPEN_CONNECTION);
assertEquals(16, bytesRead); assertEquals(16, bytesRead);
} }
...@@ -603,15 +570,12 @@ public final class CronetDataSourceTest { ...@@ -603,15 +570,12 @@ public final class CronetDataSourceTest {
// We should still be trying to open. // We should still be trying to open.
assertFalse(timedOutCondition.block(50)); assertFalse(timedOutCondition.block(50));
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
// We should still be trying to open as we approach the timeout. // We should still be trying to open as we approach the timeout.
when(mockClock.elapsedRealtime()).thenReturn((long) TEST_CONNECT_TIMEOUT_MS - 1); when(mockClock.elapsedRealtime()).thenReturn((long) TEST_CONNECT_TIMEOUT_MS - 1);
assertFalse(timedOutCondition.block(50)); assertFalse(timedOutCondition.block(50));
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
// Now we timeout. // Now we timeout.
when(mockClock.elapsedRealtime()).thenReturn((long) TEST_CONNECT_TIMEOUT_MS); when(mockClock.elapsedRealtime()).thenReturn((long) TEST_CONNECT_TIMEOUT_MS);
timedOutCondition.block(); timedOutCondition.block();
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec); verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec);
} }
...@@ -637,15 +601,12 @@ public final class CronetDataSourceTest { ...@@ -637,15 +601,12 @@ public final class CronetDataSourceTest {
// We should still be trying to open. // We should still be trying to open.
assertFalse(openCondition.block(50)); assertFalse(openCondition.block(50));
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
// We should still be trying to open as we approach the timeout. // We should still be trying to open as we approach the timeout.
when(mockClock.elapsedRealtime()).thenReturn((long) TEST_CONNECT_TIMEOUT_MS - 1); when(mockClock.elapsedRealtime()).thenReturn((long) TEST_CONNECT_TIMEOUT_MS - 1);
assertFalse(openCondition.block(50)); assertFalse(openCondition.block(50));
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
// The response arrives just in time. // The response arrives just in time.
dataSourceUnderTest.onResponseStarted(mockUrlRequest, testUrlResponseInfo); dataSourceUnderTest.onResponseStarted(mockUrlRequest, testUrlResponseInfo);
openCondition.block(); openCondition.block();
assertEquals(CronetDataSource.OPEN_CONNECTION, dataSourceUnderTest.connectionState);
} }
@Test @Test
...@@ -674,11 +635,9 @@ public final class CronetDataSourceTest { ...@@ -674,11 +635,9 @@ public final class CronetDataSourceTest {
// We should still be trying to open. // We should still be trying to open.
assertFalse(timedOutCondition.block(50)); assertFalse(timedOutCondition.block(50));
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
// We should still be trying to open as we approach the timeout. // We should still be trying to open as we approach the timeout.
when(mockClock.elapsedRealtime()).thenReturn((long) TEST_CONNECT_TIMEOUT_MS - 1); when(mockClock.elapsedRealtime()).thenReturn((long) TEST_CONNECT_TIMEOUT_MS - 1);
assertFalse(timedOutCondition.block(50)); assertFalse(timedOutCondition.block(50));
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
// A redirect arrives just in time. // A redirect arrives just in time.
dataSourceUnderTest.onRedirectReceived(mockUrlRequest, testUrlResponseInfo, dataSourceUnderTest.onRedirectReceived(mockUrlRequest, testUrlResponseInfo,
"RandomRedirectedUrl1"); "RandomRedirectedUrl1");
...@@ -689,7 +648,6 @@ public final class CronetDataSourceTest { ...@@ -689,7 +648,6 @@ public final class CronetDataSourceTest {
assertFalse(timedOutCondition.block(newTimeoutMs)); assertFalse(timedOutCondition.block(newTimeoutMs));
// We should still be trying to open as we approach the new timeout. // We should still be trying to open as we approach the new timeout.
assertFalse(timedOutCondition.block(50)); assertFalse(timedOutCondition.block(50));
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
// A redirect arrives just in time. // A redirect arrives just in time.
dataSourceUnderTest.onRedirectReceived(mockUrlRequest, testUrlResponseInfo, dataSourceUnderTest.onRedirectReceived(mockUrlRequest, testUrlResponseInfo,
"RandomRedirectedUrl2"); "RandomRedirectedUrl2");
...@@ -700,11 +658,9 @@ public final class CronetDataSourceTest { ...@@ -700,11 +658,9 @@ public final class CronetDataSourceTest {
assertFalse(timedOutCondition.block(newTimeoutMs)); assertFalse(timedOutCondition.block(newTimeoutMs));
// We should still be trying to open as we approach the new timeout. // We should still be trying to open as we approach the new timeout.
assertFalse(timedOutCondition.block(50)); assertFalse(timedOutCondition.block(50));
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
// Now we timeout. // Now we timeout.
when(mockClock.elapsedRealtime()).thenReturn(newTimeoutMs); when(mockClock.elapsedRealtime()).thenReturn(newTimeoutMs);
timedOutCondition.block(); timedOutCondition.block();
assertEquals(CronetDataSource.OPENING_CONNECTION, dataSourceUnderTest.connectionState);
verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec); verify(mockTransferListener, never()).onTransferStart(dataSourceUnderTest, testDataSpec);
assertEquals(1, openExceptions.get()); assertEquals(1, openExceptions.get());
...@@ -818,7 +774,7 @@ public final class CronetDataSourceTest { ...@@ -818,7 +774,7 @@ public final class CronetDataSourceTest {
dataSourceUnderTest.onFailed( dataSourceUnderTest.onFailed(
mockUrlRequest, mockUrlRequest,
createUrlResponseInfo(500), // statusCode createUrlResponseInfo(500), // statusCode
null); mockUrlRequestException);
return null; return null;
} }
}).when(mockUrlRequest).read(any(ByteBuffer.class)); }).when(mockUrlRequest).read(any(ByteBuffer.class));
...@@ -869,8 +825,4 @@ public final class CronetDataSourceTest { ...@@ -869,8 +825,4 @@ public final class CronetDataSourceTest {
return testBuffer; return testBuffer;
} }
private void assertConnectionState(int state) {
assertEquals(state, dataSourceUnderTest.connectionState);
}
} }
...@@ -37,7 +37,6 @@ import java.util.List; ...@@ -37,7 +37,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.chromium.net.CronetEngine; import org.chromium.net.CronetEngine;
...@@ -91,11 +90,6 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -91,11 +90,6 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
// The size of read buffer passed to cronet UrlRequest.read(). // The size of read buffer passed to cronet UrlRequest.read().
private static final int READ_BUFFER_SIZE_BYTES = 32 * 1024; private static final int READ_BUFFER_SIZE_BYTES = 32 * 1024;
/* package */ static final int IDLE_CONNECTION = 5;
/* package */ static final int OPENING_CONNECTION = 2;
/* package */ static final int CONNECTED_CONNECTION = 3;
/* package */ static final int OPEN_CONNECTION = 4;
private final CronetEngine cronetEngine; private final CronetEngine cronetEngine;
private final Executor executor; private final Executor executor;
private final Predicate<String> contentTypePredicate; private final Predicate<String> contentTypePredicate;
...@@ -105,20 +99,29 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -105,20 +99,29 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
private final boolean resetTimeoutOnRedirects; private final boolean resetTimeoutOnRedirects;
private final Map<String, String> requestProperties; private final Map<String, String> requestProperties;
private final ConditionVariable operation; private final ConditionVariable operation;
private final ByteBuffer readBuffer;
private final Clock clock; private final Clock clock;
// Accessed by the calling thread only.
private boolean opened;
private long bytesRemaining;
// Written from the calling thread only. currentUrlRequest.start() calls ensure writes are visible
// to reads made by the Cronet thread.
private UrlRequest currentUrlRequest; private UrlRequest currentUrlRequest;
private DataSpec currentDataSpec; private DataSpec currentDataSpec;
// Reference written and read by calling thread only. Passed to Cronet thread as a local variable.
// operation.open() calls ensure writes into the buffer are visible to reads made by the calling
// thread.
private ByteBuffer readBuffer;
// Written from the Cronet thread only. operation.open() calls ensure writes are visible to reads
// made by the calling thread.
private UrlResponseInfo responseInfo; private UrlResponseInfo responseInfo;
private IOException exception;
private boolean finished;
/* package */ volatile int connectionState;
private volatile long currentConnectTimeoutMs; private volatile long currentConnectTimeoutMs;
private volatile HttpDataSourceException exception;
private volatile long contentLength;
private volatile AtomicLong expectedBytesRemainingToRead;
private volatile boolean hasData;
private volatile boolean responseFinished;
/** /**
* @param cronetEngine A CronetEngine. * @param cronetEngine A CronetEngine.
...@@ -163,10 +166,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -163,10 +166,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
this.readTimeoutMs = readTimeoutMs; this.readTimeoutMs = readTimeoutMs;
this.resetTimeoutOnRedirects = resetTimeoutOnRedirects; this.resetTimeoutOnRedirects = resetTimeoutOnRedirects;
this.clock = Assertions.checkNotNull(clock); this.clock = Assertions.checkNotNull(clock);
readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE_BYTES);
requestProperties = new HashMap<>(); requestProperties = new HashMap<>();
operation = new ConditionVariable(); operation = new ConditionVariable();
connectionState = IDLE_CONNECTION;
} }
// HttpDataSource implementation. // HttpDataSource implementation.
...@@ -205,10 +206,7 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -205,10 +206,7 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
@Override @Override
public long open(DataSpec dataSpec) throws HttpDataSourceException { public long open(DataSpec dataSpec) throws HttpDataSourceException {
Assertions.checkNotNull(dataSpec); Assertions.checkNotNull(dataSpec);
synchronized (this) { Assertions.checkState(!opened);
Assertions.checkState(connectionState == IDLE_CONNECTION, "Connection already open");
connectionState = OPENING_CONNECTION;
}
operation.close(); operation.close();
resetConnectTimeout(); resetConnectTimeout();
...@@ -218,61 +216,99 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -218,61 +216,99 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
boolean requestStarted = blockUntilConnectTimeout(); boolean requestStarted = blockUntilConnectTimeout();
if (exception != null) { if (exception != null) {
// An error occurred opening the connection. throw new OpenException(exception, currentDataSpec, getStatus(currentUrlRequest));
throw exception;
} else if (!requestStarted) { } else if (!requestStarted) {
// The timeout was reached before the connection was opened. // The timeout was reached before the connection was opened.
throw new OpenException(new SocketTimeoutException(), dataSpec, getStatus(currentUrlRequest)); throw new OpenException(new SocketTimeoutException(), dataSpec, getStatus(currentUrlRequest));
} }
// Connection was opened. // Check for a valid response code.
int responseCode = responseInfo.getHttpStatusCode();
if (responseCode < 200 || responseCode > 299) {
InvalidResponseCodeException exception = new InvalidResponseCodeException(responseCode,
responseInfo.getAllHeaders(), currentDataSpec);
if (responseCode == 416) {
exception.initCause(new DataSourceException(DataSourceException.POSITION_OUT_OF_RANGE));
}
throw exception;
}
// Check for a valid content type.
if (contentTypePredicate != null) {
List<String> contentTypeHeaders = responseInfo.getAllHeaders().get(CONTENT_TYPE);
String contentType = isEmpty(contentTypeHeaders) ? null : contentTypeHeaders.get(0);
if (!contentTypePredicate.evaluate(contentType)) {
throw new InvalidContentTypeException(contentType, currentDataSpec);
}
}
// TODO: Handle the case where we requested a range starting from a non-zero position and
// received a 200 rather than a 206. This occurs if the server does not support partial
// requests, and requires that the source skips to the requested position.
// Calculate the content length.
if (!getIsCompressed(responseInfo)) {
if (dataSpec.length != C.LENGTH_UNSET) {
bytesRemaining = dataSpec.length;
} else {
bytesRemaining = getContentLength(responseInfo);
}
} else {
// If the response is compressed then the content length will be that of the compressed data
// which isn't what we want. Always use the dataSpec length in this case.
bytesRemaining = currentDataSpec.length;
}
opened = true;
if (listener != null) { if (listener != null) {
listener.onTransferStart(this, dataSpec); listener.onTransferStart(this, dataSpec);
} }
connectionState = OPEN_CONNECTION;
return contentLength; return bytesRemaining;
} }
@Override @Override
public int read(byte[] buffer, int offset, int readLength) throws HttpDataSourceException { public int read(byte[] buffer, int offset, int readLength) throws HttpDataSourceException {
synchronized (this) { Assertions.checkState(opened);
Assertions.checkState(connectionState == OPEN_CONNECTION);
}
if (readLength == 0) { if (readLength == 0) {
return 0; return 0;
} } else if (bytesRemaining == 0) {
if (expectedBytesRemainingToRead != null && expectedBytesRemainingToRead.get() == 0) {
return C.RESULT_END_OF_INPUT; return C.RESULT_END_OF_INPUT;
} }
if (!hasData) { if (readBuffer == null) {
// Read more data from cronet. readBuffer = ByteBuffer.allocateDirect(READ_BUFFER_SIZE_BYTES);
readBuffer.limit(0);
}
if (!readBuffer.hasRemaining()) {
// Fill readBuffer with more data from Cronet.
operation.close(); operation.close();
readBuffer.clear(); readBuffer.clear();
currentUrlRequest.read(readBuffer); currentUrlRequest.read(readBuffer);
if (!operation.block(readTimeoutMs)) { if (!operation.block(readTimeoutMs)) {
// We're timing out, but since the operation is still ongoing we'll need to replace
// readBuffer to avoid the possibility of it being written to by this operation during a
// subsequent request.
readBuffer = null;
throw new HttpDataSourceException( throw new HttpDataSourceException(
new SocketTimeoutException(), currentDataSpec, HttpDataSourceException.TYPE_READ); new SocketTimeoutException(), currentDataSpec, HttpDataSourceException.TYPE_READ);
} } else if (exception != null) {
if (exception != null) { throw new HttpDataSourceException(exception, currentDataSpec,
throw exception; HttpDataSourceException.TYPE_READ);
} } else if (finished) {
// The expected response length is unknown, but cronet has indicated that the request
// already finished successfully.
if (responseFinished) {
return C.RESULT_END_OF_INPUT; return C.RESULT_END_OF_INPUT;
} else {
// The operation didn't time out, fail or finish, and therefore data must have been read.
readBuffer.flip();
} }
} }
int bytesRead = Math.min(readBuffer.remaining(), readLength); int bytesRead = Math.min(readBuffer.remaining(), readLength);
readBuffer.get(buffer, offset, bytesRead); readBuffer.get(buffer, offset, bytesRead);
if (!readBuffer.hasRemaining()) {
hasData = false;
}
if (expectedBytesRemainingToRead != null) { if (bytesRemaining != C.LENGTH_UNSET) {
expectedBytesRemainingToRead.addAndGet(-bytesRead); bytesRemaining -= bytesRead;
} }
if (listener != null) { if (listener != null) {
listener.onBytesTransferred(this, bytesRead); listener.onBytesTransferred(this, bytesRead);
...@@ -286,26 +322,26 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -286,26 +322,26 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
currentUrlRequest.cancel(); currentUrlRequest.cancel();
currentUrlRequest = null; currentUrlRequest = null;
} }
if (readBuffer != null) {
readBuffer.limit(0);
}
currentDataSpec = null; currentDataSpec = null;
exception = null;
contentLength = 0;
hasData = false;
responseInfo = null; responseInfo = null;
expectedBytesRemainingToRead = null; exception = null;
responseFinished = false; finished = false;
try { if (opened) {
if (listener != null && connectionState == OPEN_CONNECTION) { opened = false;
if (listener != null) {
listener.onTransferEnd(this); listener.onTransferEnd(this);
} }
} finally {
connectionState = IDLE_CONNECTION;
} }
} }
// UrlRequest.Callback implementation // UrlRequest.Callback implementation
@Override @Override
public void onRedirectReceived(UrlRequest request, UrlResponseInfo info, String newLocationUrl) { public synchronized void onRedirectReceived(UrlRequest request, UrlResponseInfo info,
String newLocationUrl) {
if (request != currentUrlRequest) { if (request != currentUrlRequest) {
return; return;
} }
...@@ -315,8 +351,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -315,8 +351,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
// For other redirect response codes the POST request is converted to a GET request and the // For other redirect response codes the POST request is converted to a GET request and the
// redirect is followed. // redirect is followed.
if (responseCode == 307 || responseCode == 308) { if (responseCode == 307 || responseCode == 308) {
exception = new OpenException("POST request redirected with 307 or 308 response code", exception = new InvalidResponseCodeException(responseCode, info.getAllHeaders(),
currentDataSpec, getStatus(request)); currentDataSpec);
operation.open(); operation.open();
return; return;
} }
...@@ -332,51 +368,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -332,51 +368,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
if (request != currentUrlRequest) { if (request != currentUrlRequest) {
return; return;
} }
try { responseInfo = info;
// Check for a valid response code. operation.open();
int responseCode = info.getHttpStatusCode();
if (responseCode < 200 || responseCode > 299) {
InvalidResponseCodeException exception = new InvalidResponseCodeException(
responseCode, info.getAllHeaders(), currentDataSpec);
if (responseCode == 416) {
exception.initCause(new DataSourceException(DataSourceException.POSITION_OUT_OF_RANGE));
}
throw exception;
}
// Check for a valid content type.
if (contentTypePredicate != null) {
List<String> contentTypeHeaders = info.getAllHeaders().get(CONTENT_TYPE);
String contentType = contentTypeHeaders == null || contentTypeHeaders.isEmpty() ? null
: contentTypeHeaders.get(0);
if (!contentTypePredicate.evaluate(contentType)) {
throw new InvalidContentTypeException(contentType, currentDataSpec);
}
}
responseInfo = info;
if (getIsCompressed(info)) {
contentLength = currentDataSpec.length;
} else {
// Check content length.
contentLength = getContentLength(info);
// If a specific length is requested and a specific length is returned but the 2 don't match
// it's an error.
if (currentDataSpec.length != C.LENGTH_UNSET && contentLength != C.LENGTH_UNSET
&& currentDataSpec.length != contentLength) {
throw new OpenException("Content length did not match requested length", currentDataSpec,
getStatus(request));
}
}
if (contentLength > 0) {
expectedBytesRemainingToRead = new AtomicLong(contentLength);
}
connectionState = CONNECTED_CONNECTION;
} catch (HttpDataSourceException e) {
exception = e;
} finally {
operation.open();
}
} }
@Override @Override
...@@ -385,17 +378,15 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -385,17 +378,15 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
if (request != currentUrlRequest) { if (request != currentUrlRequest) {
return; return;
} }
readBuffer.flip();
hasData = true;
operation.open(); operation.open();
} }
@Override @Override
public void onSucceeded(UrlRequest request, UrlResponseInfo info) { public synchronized void onSucceeded(UrlRequest request, UrlResponseInfo info) {
if (request != currentUrlRequest) { if (request != currentUrlRequest) {
return; return;
} }
responseFinished = true; finished = true;
operation.open(); operation.open();
} }
...@@ -405,14 +396,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -405,14 +396,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
if (request != currentUrlRequest) { if (request != currentUrlRequest) {
return; return;
} }
if (connectionState == OPENING_CONNECTION) { exception = error.getErrorCode() == UrlRequestException.ERROR_HOSTNAME_NOT_RESOLVED
IOException cause = error.getErrorCode() == UrlRequestException.ERROR_HOSTNAME_NOT_RESOLVED ? new UnknownHostException() : error;
? new UnknownHostException() : error;
exception = new OpenException(cause, currentDataSpec, getStatus(request));
} else if (connectionState == OPEN_CONNECTION) {
exception = new HttpDataSourceException(error, currentDataSpec,
HttpDataSourceException.TYPE_READ);
}
operation.open(); operation.open();
} }
...@@ -477,7 +462,7 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -477,7 +462,7 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
Map<String, List<String>> headers = info.getAllHeaders(); Map<String, List<String>> headers = info.getAllHeaders();
List<String> contentLengthHeaders = headers.get("Content-Length"); List<String> contentLengthHeaders = headers.get("Content-Length");
String contentLengthHeader = null; String contentLengthHeader = null;
if (contentLengthHeaders != null && !contentLengthHeaders.isEmpty()) { if (!isEmpty(contentLengthHeaders)) {
contentLengthHeader = contentLengthHeaders.get(0); contentLengthHeader = contentLengthHeaders.get(0);
if (!TextUtils.isEmpty(contentLengthHeader)) { if (!TextUtils.isEmpty(contentLengthHeader)) {
try { try {
...@@ -488,7 +473,7 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -488,7 +473,7 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
} }
} }
List<String> contentRangeHeaders = headers.get("Content-Range"); List<String> contentRangeHeaders = headers.get("Content-Range");
if (contentRangeHeaders != null && !contentRangeHeaders.isEmpty()) { if (!isEmpty(contentRangeHeaders)) {
String contentRangeHeader = contentRangeHeaders.get(0); String contentRangeHeader = contentRangeHeaders.get(0);
Matcher matcher = CONTENT_RANGE_HEADER_PATTERN.matcher(contentRangeHeader); Matcher matcher = CONTENT_RANGE_HEADER_PATTERN.matcher(contentRangeHeader);
if (matcher.find()) { if (matcher.find()) {
...@@ -530,4 +515,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou ...@@ -530,4 +515,8 @@ public class CronetDataSource extends UrlRequest.Callback implements HttpDataSou
return statusHolder[0]; return statusHolder[0];
} }
private static boolean isEmpty(List<?> list) {
return list == null || list.isEmpty();
}
} }
...@@ -41,7 +41,7 @@ public final class CronetDataSourceFactory implements Factory { ...@@ -41,7 +41,7 @@ public final class CronetDataSourceFactory implements Factory {
private final CronetEngine cronetEngine; private final CronetEngine cronetEngine;
private final Executor executor; private final Executor executor;
private final Predicate<String> contentTypePredicate; private final Predicate<String> contentTypePredicate;
private final TransferListener transferListener; private final TransferListener<? super DataSource> transferListener;
private final int connectTimeoutMs; private final int connectTimeoutMs;
private final int readTimeoutMs; private final int readTimeoutMs;
private final boolean resetTimeoutOnRedirects; private final boolean resetTimeoutOnRedirects;
......
...@@ -185,9 +185,12 @@ public class OkHttpDataSource implements HttpDataSource { ...@@ -185,9 +185,12 @@ public class OkHttpDataSource implements HttpDataSource {
bytesToSkip = responseCode == 200 && dataSpec.position != 0 ? dataSpec.position : 0; bytesToSkip = responseCode == 200 && dataSpec.position != 0 ? dataSpec.position : 0;
// Determine the length of the data to be read, after skipping. // Determine the length of the data to be read, after skipping.
long contentLength = response.body().contentLength(); if (dataSpec.length != C.LENGTH_UNSET) {
bytesToRead = dataSpec.length != C.LENGTH_UNSET ? dataSpec.length bytesToRead = dataSpec.length;
: (contentLength != -1 ? (contentLength - bytesToSkip) : C.LENGTH_UNSET); } else {
long contentLength = response.body().contentLength();
bytesToRead = contentLength != -1 ? (contentLength - bytesToSkip) : C.LENGTH_UNSET;
}
opened = true; opened = true;
if (listener != null) { if (listener != null) {
......
...@@ -231,10 +231,13 @@ public class DefaultHttpDataSource implements HttpDataSource { ...@@ -231,10 +231,13 @@ public class DefaultHttpDataSource implements HttpDataSource {
// Determine the length of the data to be read, after skipping. // Determine the length of the data to be read, after skipping.
if ((dataSpec.flags & DataSpec.FLAG_ALLOW_GZIP) == 0) { if ((dataSpec.flags & DataSpec.FLAG_ALLOW_GZIP) == 0) {
long contentLength = getContentLength(connection); if (dataSpec.length != C.LENGTH_UNSET) {
bytesToRead = dataSpec.length != C.LENGTH_UNSET ? dataSpec.length bytesToRead = dataSpec.length;
: contentLength != C.LENGTH_UNSET ? contentLength - bytesToSkip } else {
: C.LENGTH_UNSET; long contentLength = getContentLength(connection);
bytesToRead = contentLength != C.LENGTH_UNSET ? (contentLength - bytesToSkip)
: C.LENGTH_UNSET;
}
} else { } else {
// Gzip is enabled. If the server opts to use gzip then the content length in the response // Gzip is enabled. If the server opts to use gzip then the content length in the response
// will be that of the compressed data, which isn't what we want. Furthermore, there isn't a // will be that of the compressed data, which isn't what we want. Furthermore, there isn't a
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment