Commit 620ab67f by claincly Committed by Oliver Woodman

Improve timeout handling and allow customizing the timeout.

Previously, a SocketTimeourException is used to signal the end of the stream
that is caused by "no RTP packets received for a while". However, such
signaling is inappropriate under TransferRtpDataChannel, or FakeRtpDataChannel
in RtspPlaybackTests.

Hence, the signaling of end of stream is changed to use RESULT_END_OF_INPUT.
The RtpDataChannel implementations will Still block until a set timeout, but
will return a C.RESULT_END_OF_INPUT should a timeout occur, instead of
throwing a nested SocketTimeoutException.

This also allowed customization of the timeout amount, in
RtspMediaSource.Factory

PiperOrigin-RevId: 380981534
parent 5e2197d8
...@@ -30,9 +30,16 @@ import java.io.IOException; ...@@ -30,9 +30,16 @@ import java.io.IOException;
/** /**
* Creates a new {@link RtpDataChannel} instance for RTP data transfer. * Creates a new {@link RtpDataChannel} instance for RTP data transfer.
* *
* @param trackId The track ID.
* @throws IOException If the data channels failed to open. * @throws IOException If the data channels failed to open.
*/ */
RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException; RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException;
/** Returns a fallback {@code Factory}, {@code null} when there is no fallback available. */
@Nullable
default Factory createFallbackDataChannelFactory() {
return null;
}
} }
/** Returns the RTSP transport header for this {@link RtpDataChannel} */ /** Returns the RTSP transport header for this {@link RtpDataChannel} */
......
...@@ -22,6 +22,7 @@ import android.os.Handler; ...@@ -22,6 +22,7 @@ import android.os.Handler;
import androidx.annotation.Nullable; import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.extractor.DefaultExtractorInput; import com.google.android.exoplayer2.extractor.DefaultExtractorInput;
import com.google.android.exoplayer2.extractor.Extractor;
import com.google.android.exoplayer2.extractor.ExtractorInput; import com.google.android.exoplayer2.extractor.ExtractorInput;
import com.google.android.exoplayer2.extractor.ExtractorOutput; import com.google.android.exoplayer2.extractor.ExtractorOutput;
import com.google.android.exoplayer2.extractor.PositionHolder; import com.google.android.exoplayer2.extractor.PositionHolder;
...@@ -152,7 +153,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -152,7 +153,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
extractor.seek(nextRtpTimestamp, pendingSeekPositionUs); extractor.seek(nextRtpTimestamp, pendingSeekPositionUs);
pendingSeekPositionUs = C.TIME_UNSET; pendingSeekPositionUs = C.TIME_UNSET;
} }
extractor.read(extractorInput, /* seekPosition= */ new PositionHolder());
@Extractor.ReadResult
int readResult = extractor.read(extractorInput, /* seekPosition= */ new PositionHolder());
if (readResult == Extractor.RESULT_END_OF_INPUT) {
// Loading is finished.
break;
}
} }
} finally { } finally {
Util.closeQuietly(dataChannel); Util.closeQuietly(dataChannel);
......
...@@ -125,10 +125,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -125,10 +125,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
// Reads one RTP packet at a time. // Reads one RTP packet at a time.
int bytesRead = input.read(rtpPacketScratchBuffer.getData(), 0, RtpPacket.MAX_SIZE); int bytesRead = input.read(rtpPacketScratchBuffer.getData(), 0, RtpPacket.MAX_SIZE);
if (bytesRead == RESULT_END_OF_INPUT) { if (bytesRead == C.RESULT_END_OF_INPUT) {
return RESULT_END_OF_INPUT; return Extractor.RESULT_END_OF_INPUT;
} else if (bytesRead == 0) { } else if (bytesRead == 0) {
return RESULT_CONTINUE; return Extractor.RESULT_CONTINUE;
} }
rtpPacketScratchBuffer.setPosition(0); rtpPacketScratchBuffer.setPosition(0);
......
...@@ -48,13 +48,11 @@ import com.google.android.exoplayer2.trackselection.ExoTrackSelection; ...@@ -48,13 +48,11 @@ import com.google.android.exoplayer2.trackselection.ExoTrackSelection;
import com.google.android.exoplayer2.trackselection.TrackSelection; import com.google.android.exoplayer2.trackselection.TrackSelection;
import com.google.android.exoplayer2.upstream.Allocator; import com.google.android.exoplayer2.upstream.Allocator;
import com.google.android.exoplayer2.upstream.Loader; import com.google.android.exoplayer2.upstream.Loader;
import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction;
import com.google.android.exoplayer2.upstream.Loader.Loadable; import com.google.android.exoplayer2.upstream.Loader.Loadable;
import com.google.android.exoplayer2.util.Util; import com.google.android.exoplayer2.util.Util;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import java.io.IOException; import java.io.IOException;
import java.net.BindException; import java.net.BindException;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.checkerframework.checker.nullness.compatqual.NullableType; import org.checkerframework.checker.nullness.compatqual.NullableType;
...@@ -103,6 +101,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -103,6 +101,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
* @param rtpDataChannelFactory A {@link RtpDataChannel.Factory} for {@link RtpDataChannel}. * @param rtpDataChannelFactory A {@link RtpDataChannel.Factory} for {@link RtpDataChannel}.
* @param uri The RTSP playback {@link Uri}. * @param uri The RTSP playback {@link Uri}.
* @param listener A {@link Listener} to receive session information updates. * @param listener A {@link Listener} to receive session information updates.
* @param userAgent The user agent.
*/ */
public RtspMediaPeriod( public RtspMediaPeriod(
Allocator allocator, Allocator allocator,
...@@ -432,7 +431,28 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -432,7 +431,28 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Override @Override
public void onLoadCompleted( public void onLoadCompleted(
RtpDataLoadable loadable, long elapsedRealtimeMs, long loadDurationMs) {} RtpDataLoadable loadable, long elapsedRealtimeMs, long loadDurationMs) {
// TODO(b/172331505) Allow for retry when loading is not ending.
if (getBufferedPositionUs() == 0) {
if (!isUsingRtpTcp) {
// Retry playback with TCP if no sample has been received so far, and we are not already
// using TCP. Retrying will setup new loadables, so will not retry with the current
// loadables.
retryWithRtpTcp();
isUsingRtpTcp = true;
}
return;
}
// Cancel the loader wrapper associated with the completed loadable.
for (int i = 0; i < rtspLoaderWrappers.size(); i++) {
RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i);
if (loaderWrapper.loadInfo.loadable == loadable) {
loaderWrapper.cancelLoad();
break;
}
}
}
@Override @Override
public void onLoadCanceled( public void onLoadCanceled(
...@@ -448,9 +468,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -448,9 +468,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
if (!prepared) { if (!prepared) {
preparationError = error; preparationError = error;
} else { } else {
if (error.getCause() instanceof SocketTimeoutException) { if (error.getCause() instanceof BindException) {
return handleSocketTimeout(loadable);
} else if (error.getCause() instanceof BindException) {
// Allow for retry on RTP port open failure by catching BindException. Two ports are // Allow for retry on RTP port open failure by catching BindException. Two ports are
// opened for each RTP stream, the first port number is auto assigned by the system, while // opened for each RTP stream, the first port number is auto assigned by the system, while
// the second is manually selected. It is thus possible that the second port fails to // the second is manually selected. It is thus possible that the second port fails to
...@@ -525,30 +543,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -525,30 +543,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
playbackException = error; playbackException = error;
} }
/** Handles the {@link Loadable} whose {@link RtpDataChannel} timed out. */
private LoadErrorAction handleSocketTimeout(RtpDataLoadable loadable) {
// TODO(b/172331505) Allow for retry when loading is not ending.
if (getBufferedPositionUs() == 0) {
if (!isUsingRtpTcp) {
// Retry playback with TCP if no sample has been received so far, and we are not already
// using TCP. Retrying will setup new loadables, so will not retry with the current
// loadables.
retryWithRtpTcp();
isUsingRtpTcp = true;
}
return Loader.DONT_RETRY;
}
for (int i = 0; i < rtspLoaderWrappers.size(); i++) {
RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i);
if (loaderWrapper.loadInfo.loadable == loadable) {
loaderWrapper.cancelLoad();
break;
}
}
return Loader.DONT_RETRY;
}
@Override @Override
public void onSessionTimelineUpdated( public void onSessionTimelineUpdated(
RtspSessionTiming timing, ImmutableList<RtspMediaTrack> tracks) { RtspSessionTiming timing, ImmutableList<RtspMediaTrack> tracks) {
...@@ -572,7 +566,15 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -572,7 +566,15 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private void retryWithRtpTcp() { private void retryWithRtpTcp() {
rtspClient.retryWithRtpTcp(); rtspClient.retryWithRtpTcp();
RtpDataChannel.Factory rtpDataChannelFactory = new TransferRtpDataChannelFactory(); @Nullable
RtpDataChannel.Factory fallbackRtpDataChannelFactory =
rtpDataChannelFactory.createFallbackDataChannelFactory();
if (fallbackRtpDataChannelFactory == null) {
playbackException =
new RtspPlaybackException("No fallback data channel factory for TCP retry");
return;
}
ArrayList<RtspLoaderWrapper> newLoaderWrappers = new ArrayList<>(rtspLoaderWrappers.size()); ArrayList<RtspLoaderWrapper> newLoaderWrappers = new ArrayList<>(rtspLoaderWrappers.size());
ArrayList<RtpLoadInfo> newSelectedLoadInfos = new ArrayList<>(selectedLoadInfos.size()); ArrayList<RtpLoadInfo> newSelectedLoadInfos = new ArrayList<>(selectedLoadInfos.size());
...@@ -583,7 +585,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -583,7 +585,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
if (!loaderWrapper.canceled) { if (!loaderWrapper.canceled) {
RtspLoaderWrapper newLoaderWrapper = RtspLoaderWrapper newLoaderWrapper =
new RtspLoaderWrapper( new RtspLoaderWrapper(
loaderWrapper.loadInfo.mediaTrack, /* trackId= */ i, rtpDataChannelFactory); loaderWrapper.loadInfo.mediaTrack, /* trackId= */ i, fallbackRtpDataChannelFactory);
newLoaderWrappers.add(newLoaderWrapper); newLoaderWrappers.add(newLoaderWrapper);
newLoaderWrapper.startLoading(); newLoaderWrapper.startLoading();
if (selectedLoadInfos.contains(loaderWrapper.loadInfo)) { if (selectedLoadInfos.contains(loaderWrapper.loadInfo)) {
......
...@@ -16,9 +16,11 @@ ...@@ -16,9 +16,11 @@
package com.google.android.exoplayer2.source.rtsp; package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkNotNull; import static com.google.android.exoplayer2.util.Assertions.checkNotNull;
import android.net.Uri; import android.net.Uri;
import androidx.annotation.IntRange;
import androidx.annotation.Nullable; import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting; import androidx.annotation.VisibleForTesting;
import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.C;
...@@ -46,6 +48,9 @@ public final class RtspMediaSource extends BaseMediaSource { ...@@ -46,6 +48,9 @@ public final class RtspMediaSource extends BaseMediaSource {
ExoPlayerLibraryInfo.registerModule("goog.exo.rtsp"); ExoPlayerLibraryInfo.registerModule("goog.exo.rtsp");
} }
/** The default value for {@link Factory#setTimeoutMs}. */
public static final long DEFAULT_TIMEOUT_MS = 8000;
/** /**
* Factory for {@link RtspMediaSource} * Factory for {@link RtspMediaSource}
* *
...@@ -61,10 +66,12 @@ public final class RtspMediaSource extends BaseMediaSource { ...@@ -61,10 +66,12 @@ public final class RtspMediaSource extends BaseMediaSource {
*/ */
public static final class Factory implements MediaSourceFactory { public static final class Factory implements MediaSourceFactory {
private long timeoutMs;
private String userAgent; private String userAgent;
private boolean forceUseRtpTcp; private boolean forceUseRtpTcp;
public Factory() { public Factory() {
timeoutMs = DEFAULT_TIMEOUT_MS;
userAgent = ExoPlayerLibraryInfo.VERSION_SLASHY; userAgent = ExoPlayerLibraryInfo.VERSION_SLASHY;
} }
...@@ -95,6 +102,21 @@ public final class RtspMediaSource extends BaseMediaSource { ...@@ -95,6 +102,21 @@ public final class RtspMediaSource extends BaseMediaSource {
return this; return this;
} }
/**
* Sets the timeout in milliseconds, the default value is {@link #DEFAULT_TIMEOUT_MS}.
*
* <p>A positive number of milliseconds to wait before lack of received RTP packets is treated
* as the end of input.
*
* @param timeoutMs The timeout measured in milliseconds.
* @return This Factory, for convenience.
*/
public Factory setTimeoutMs(@IntRange(from = 1) long timeoutMs) {
checkArgument(timeoutMs > 0);
this.timeoutMs = timeoutMs;
return this;
}
/** Does nothing. {@link RtspMediaSource} does not support DRM. */ /** Does nothing. {@link RtspMediaSource} does not support DRM. */
@Override @Override
public Factory setDrmSessionManagerProvider( public Factory setDrmSessionManagerProvider(
...@@ -162,8 +184,8 @@ public final class RtspMediaSource extends BaseMediaSource { ...@@ -162,8 +184,8 @@ public final class RtspMediaSource extends BaseMediaSource {
return new RtspMediaSource( return new RtspMediaSource(
mediaItem, mediaItem,
forceUseRtpTcp forceUseRtpTcp
? new TransferRtpDataChannelFactory() ? new TransferRtpDataChannelFactory(timeoutMs)
: new UdpDataSourceRtpDataChannelFactory(), : new UdpDataSourceRtpDataChannelFactory(timeoutMs),
userAgent); userAgent);
} }
} }
......
...@@ -26,8 +26,6 @@ import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.InterleavedB ...@@ -26,8 +26,6 @@ import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.InterleavedB
import com.google.android.exoplayer2.upstream.BaseDataSource; import com.google.android.exoplayer2.upstream.BaseDataSource;
import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.util.Util; import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
...@@ -37,16 +35,22 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -37,16 +35,22 @@ import java.util.concurrent.LinkedBlockingQueue;
private static final String DEFAULT_TCP_TRANSPORT_FORMAT = private static final String DEFAULT_TCP_TRANSPORT_FORMAT =
"RTP/AVP/TCP;unicast;interleaved=%d-%d"; "RTP/AVP/TCP;unicast;interleaved=%d-%d";
private static final long TIMEOUT_MS = 8_000;
private final LinkedBlockingQueue<byte[]> packetQueue; private final LinkedBlockingQueue<byte[]> packetQueue;
private final long pollTimeoutMs;
private byte[] unreadData; private byte[] unreadData;
private int channelNumber; private int channelNumber;
/** Creates a new instance. */ /**
public TransferRtpDataChannel() { * Creates a new instance.
*
* @param pollTimeoutMs The number of milliseconds which {@link #read} waits for a packet to be
* available. After the time has expired, {@link C#RESULT_END_OF_INPUT} is returned.
*/
public TransferRtpDataChannel(long pollTimeoutMs) {
super(/* isNetwork= */ true); super(/* isNetwork= */ true);
this.pollTimeoutMs = pollTimeoutMs;
packetQueue = new LinkedBlockingQueue<>(); packetQueue = new LinkedBlockingQueue<>();
unreadData = new byte[0]; unreadData = new byte[0];
channelNumber = C.INDEX_UNSET; channelNumber = C.INDEX_UNSET;
...@@ -84,7 +88,7 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -84,7 +88,7 @@ import java.util.concurrent.LinkedBlockingQueue;
} }
@Override @Override
public int read(byte[] target, int offset, int length) throws IOException { public int read(byte[] target, int offset, int length) {
if (length == 0) { if (length == 0) {
return 0; return 0;
} }
...@@ -101,11 +105,9 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -101,11 +105,9 @@ import java.util.concurrent.LinkedBlockingQueue;
@Nullable byte[] data; @Nullable byte[] data;
try { try {
// TODO(internal b/172331505) Consider move the receiving timeout logic to an upper level data = packetQueue.poll(pollTimeoutMs, MILLISECONDS);
// (maybe RtspClient). There is no actual socket receiving here.
data = packetQueue.poll(TIMEOUT_MS, MILLISECONDS);
if (data == null) { if (data == null) {
throw new IOException(new SocketTimeoutException()); return C.RESULT_END_OF_INPUT;
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
......
...@@ -20,9 +20,21 @@ package com.google.android.exoplayer2.source.rtsp; ...@@ -20,9 +20,21 @@ package com.google.android.exoplayer2.source.rtsp;
private static final int INTERLEAVED_CHANNELS_PER_TRACK = 2; private static final int INTERLEAVED_CHANNELS_PER_TRACK = 2;
private final long timeoutMs;
/**
* Creates a new instance.
*
* @param timeoutMs A positive number of milliseconds to wait before lack of received RTP packets
* is treated as the end of input.
*/
public TransferRtpDataChannelFactory(long timeoutMs) {
this.timeoutMs = timeoutMs;
}
@Override @Override
public RtpDataChannel createAndOpenDataChannel(int trackId) { public RtpDataChannel createAndOpenDataChannel(int trackId) {
TransferRtpDataChannel dataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel dataChannel = new TransferRtpDataChannel(timeoutMs);
dataChannel.open(RtpUtils.getIncomingRtpDataSpec(trackId * INTERLEAVED_CHANNELS_PER_TRACK)); dataChannel.open(RtpUtils.getIncomingRtpDataSpec(trackId * INTERLEAVED_CHANNELS_PER_TRACK));
return dataChannel; return dataChannel;
} }
......
...@@ -25,7 +25,9 @@ import com.google.android.exoplayer2.upstream.DataSpec; ...@@ -25,7 +25,9 @@ import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.upstream.TransferListener; import com.google.android.exoplayer2.upstream.TransferListener;
import com.google.android.exoplayer2.upstream.UdpDataSource; import com.google.android.exoplayer2.upstream.UdpDataSource;
import com.google.android.exoplayer2.util.Util; import com.google.android.exoplayer2.util.Util;
import com.google.common.primitives.Ints;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException;
/** An {@link RtpDataChannel} for UDP transport. */ /** An {@link RtpDataChannel} for UDP transport. */
/* package */ final class UdpDataSourceRtpDataChannel implements RtpDataChannel { /* package */ final class UdpDataSourceRtpDataChannel implements RtpDataChannel {
...@@ -37,9 +39,14 @@ import java.io.IOException; ...@@ -37,9 +39,14 @@ import java.io.IOException;
/** The associated RTCP channel; {@code null} if the current channel is an RTCP channel. */ /** The associated RTCP channel; {@code null} if the current channel is an RTCP channel. */
@Nullable private UdpDataSourceRtpDataChannel rtcpChannel; @Nullable private UdpDataSourceRtpDataChannel rtcpChannel;
/** Creates a new instance. */ /**
public UdpDataSourceRtpDataChannel() { * Creates a new instance.
dataSource = new UdpDataSource(); *
* @param socketTimeoutMs The timeout for {@link #read} in milliseconds.
*/
public UdpDataSourceRtpDataChannel(long socketTimeoutMs) {
dataSource =
new UdpDataSource(UdpDataSource.DEFAULT_MAX_PACKET_SIZE, Ints.checkedCast(socketTimeoutMs));
} }
@Override @Override
...@@ -88,7 +95,15 @@ import java.io.IOException; ...@@ -88,7 +95,15 @@ import java.io.IOException;
@Override @Override
public int read(byte[] target, int offset, int length) throws IOException { public int read(byte[] target, int offset, int length) throws IOException {
return dataSource.read(target, offset, length); try {
return dataSource.read(target, offset, length);
} catch (UdpDataSource.UdpDataSourceException e) {
if (e.getCause() instanceof SocketTimeoutException) {
return C.RESULT_END_OF_INPUT;
} else {
throw e;
}
}
} }
public void setRtcpChannel(UdpDataSourceRtpDataChannel rtcpChannel) { public void setRtcpChannel(UdpDataSourceRtpDataChannel rtcpChannel) {
......
...@@ -21,10 +21,22 @@ import java.io.IOException; ...@@ -21,10 +21,22 @@ import java.io.IOException;
/** Factory for {@link UdpDataSourceRtpDataChannel}. */ /** Factory for {@link UdpDataSourceRtpDataChannel}. */
/* package */ final class UdpDataSourceRtpDataChannelFactory implements RtpDataChannel.Factory { /* package */ final class UdpDataSourceRtpDataChannelFactory implements RtpDataChannel.Factory {
private final long socketTimeoutMs;
/**
* Creates a new instance.
*
* @param socketTimeoutMs A positive number of milliseconds to wait before lack of received RTP
* packets is treated as the end of input.
*/
public UdpDataSourceRtpDataChannelFactory(long socketTimeoutMs) {
this.socketTimeoutMs = socketTimeoutMs;
}
@Override @Override
public RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException { public RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException {
UdpDataSourceRtpDataChannel firstChannel = new UdpDataSourceRtpDataChannel(); UdpDataSourceRtpDataChannel firstChannel = new UdpDataSourceRtpDataChannel(socketTimeoutMs);
UdpDataSourceRtpDataChannel secondChannel = new UdpDataSourceRtpDataChannel(); UdpDataSourceRtpDataChannel secondChannel = new UdpDataSourceRtpDataChannel(socketTimeoutMs);
try { try {
// From RFC3550 Section 11: "For UDP and similar protocols, RTP SHOULD use an even destination // From RFC3550 Section 11: "For UDP and similar protocols, RTP SHOULD use an even destination
...@@ -53,4 +65,9 @@ import java.io.IOException; ...@@ -53,4 +65,9 @@ import java.io.IOException;
throw e; throw e;
} }
} }
@Override
public RtpDataChannel.Factory createFallbackDataChannelFactory() {
return new TransferRtpDataChannelFactory(/* timeoutMs= */ socketTimeoutMs);
}
} }
...@@ -37,6 +37,8 @@ import org.robolectric.annotation.internal.DoNotInstrument; ...@@ -37,6 +37,8 @@ import org.robolectric.annotation.internal.DoNotInstrument;
@DoNotInstrument @DoNotInstrument
public final class RtspMediaPeriodTest { public final class RtspMediaPeriodTest {
private static final long DEFAULT_TIMEOUT_MS = 8000;
private RtspMediaPeriod mediaPeriod; private RtspMediaPeriod mediaPeriod;
private RtspServer rtspServer; private RtspServer rtspServer;
...@@ -80,7 +82,7 @@ public final class RtspMediaPeriodTest { ...@@ -80,7 +82,7 @@ public final class RtspMediaPeriodTest {
mediaPeriod = mediaPeriod =
new RtspMediaPeriod( new RtspMediaPeriod(
new DefaultAllocator(/* trimOnReset= */ true, C.DEFAULT_BUFFER_SEGMENT_SIZE), new DefaultAllocator(/* trimOnReset= */ true, C.DEFAULT_BUFFER_SEGMENT_SIZE),
new TransferRtpDataChannelFactory(), new TransferRtpDataChannelFactory(DEFAULT_TIMEOUT_MS),
RtspTestUtils.getTestUri(rtspServer.startAndGetPortNumber()), RtspTestUtils.getTestUri(rtspServer.startAndGetPortNumber()),
/* listener= */ timing -> refreshedSourceDurationMs.set(timing.getDurationMs()), /* listener= */ timing -> refreshedSourceDurationMs.set(timing.getDurationMs()),
/* userAgent= */ "ExoPlayer:RtspPeriodTest"); /* userAgent= */ "ExoPlayer:RtspPeriodTest");
......
...@@ -42,8 +42,6 @@ import com.google.android.exoplayer2.upstream.DataSpec; ...@@ -42,8 +42,6 @@ import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.util.Clock; import com.google.android.exoplayer2.util.Clock;
import com.google.android.exoplayer2.util.Util; import com.google.android.exoplayer2.util.Util;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
...@@ -83,7 +81,7 @@ public final class RtspPlaybackTest { ...@@ -83,7 +81,7 @@ public final class RtspPlaybackTest {
capturingRenderersFactory = new CapturingRenderersFactory(applicationContext); capturingRenderersFactory = new CapturingRenderersFactory(applicationContext);
clock = new FakeClock(/* isAutoAdvancing= */ true); clock = new FakeClock(/* isAutoAdvancing= */ true);
fakeRtpDataChannel = new FakeUdpDataSourceRtpDataChannel(); fakeRtpDataChannel = new FakeUdpDataSourceRtpDataChannel();
rtpDataChannelFactory = trackId -> fakeRtpDataChannel; rtpDataChannelFactory = (trackId) -> fakeRtpDataChannel;
} }
@Rule @Rule
...@@ -289,7 +287,7 @@ public final class RtspPlaybackTest { ...@@ -289,7 +287,7 @@ public final class RtspPlaybackTest {
public void close() {} public void close() {}
@Override @Override
public int read(byte[] target, int offset, int length) throws IOException { public int read(byte[] target, int offset, int length) {
if (length == 0) { if (length == 0) {
return 0; return 0;
} }
...@@ -301,7 +299,7 @@ public final class RtspPlaybackTest { ...@@ -301,7 +299,7 @@ public final class RtspPlaybackTest {
if (data.length == 0) { if (data.length == 0) {
// Empty data signals the end of a packet stream. // Empty data signals the end of a packet stream.
throw new IOException(new SocketTimeoutException()); return C.RESULT_END_OF_INPUT;
} }
int byteToRead = min(length, data.length); int byteToRead = min(length, data.length);
......
...@@ -17,11 +17,10 @@ package com.google.android.exoplayer2.source.rtsp; ...@@ -17,11 +17,10 @@ package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.testutil.TestUtil.buildTestData; import static com.google.android.exoplayer2.testutil.TestUtil.buildTestData;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import androidx.test.ext.junit.runners.AndroidJUnit4; import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.C;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;
import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -32,29 +31,30 @@ import org.robolectric.annotation.internal.DoNotInstrument; ...@@ -32,29 +31,30 @@ import org.robolectric.annotation.internal.DoNotInstrument;
@DoNotInstrument @DoNotInstrument
public class TransferRtpDataChannelTest { public class TransferRtpDataChannelTest {
private static final long POLL_TIMEOUT_MS = 8000;
@Test @Test
public void getInterleavedBinaryDataListener_returnsAnInterleavedBinaryDataListener() { public void getInterleavedBinaryDataListener_returnsAnInterleavedBinaryDataListener() {
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
assertThat(transferRtpDataChannel.getInterleavedBinaryDataListener()) assertThat(transferRtpDataChannel.getInterleavedBinaryDataListener())
.isEqualTo(transferRtpDataChannel); .isEqualTo(transferRtpDataChannel);
} }
@Test @Test
public void read_withoutReceivingInterleavedData_timesOut() { public void read_withoutReceivingInterleavedData_returnsEndOfInput() {
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
byte[] buffer = new byte[1]; byte[] buffer = new byte[1];
assertThrows( assertThat(transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length))
IOException.class, .isEqualTo(C.RESULT_END_OF_INPUT);
() -> transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length));
} }
@Test @Test
public void read_withLargeEnoughBuffer_reads() throws Exception { public void read_withLargeEnoughBuffer_reads() {
byte[] randomBytes = buildTestData(20); byte[] randomBytes = buildTestData(20);
byte[] buffer = new byte[40]; byte[] buffer = new byte[40];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes); transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
...@@ -63,10 +63,10 @@ public class TransferRtpDataChannelTest { ...@@ -63,10 +63,10 @@ public class TransferRtpDataChannelTest {
} }
@Test @Test
public void read_withSmallBufferEnoughBuffer_readsThreeTimes() throws Exception { public void read_withSmallBufferEnoughBuffer_readsThreeTimes() {
byte[] randomBytes = buildTestData(20); byte[] randomBytes = buildTestData(20);
byte[] buffer = new byte[8]; byte[] buffer = new byte[8];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes); transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
...@@ -79,10 +79,10 @@ public class TransferRtpDataChannelTest { ...@@ -79,10 +79,10 @@ public class TransferRtpDataChannelTest {
} }
@Test @Test
public void read_withSmallBuffer_reads() throws Exception { public void read_withSmallBuffer_reads() {
byte[] randomBytes = buildTestData(40); byte[] randomBytes = buildTestData(40);
byte[] buffer = new byte[20]; byte[] buffer = new byte[20];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes); transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
...@@ -93,12 +93,12 @@ public class TransferRtpDataChannelTest { ...@@ -93,12 +93,12 @@ public class TransferRtpDataChannelTest {
} }
@Test @Test
public void read_withSmallAndModerateBufferAndSubsequentProducerWrite_reads() throws Exception { public void read_withSmallAndModerateBufferAndSubsequentProducerWrite_reads() {
byte[] randomBytes1 = buildTestData(40); byte[] randomBytes1 = buildTestData(40);
byte[] randomBytes2 = buildTestData(40); byte[] randomBytes2 = buildTestData(40);
byte[] smallBuffer = new byte[20]; byte[] smallBuffer = new byte[20];
byte[] bigBuffer = new byte[40]; byte[] bigBuffer = new byte[40];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1); transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);
...@@ -122,13 +122,12 @@ public class TransferRtpDataChannelTest { ...@@ -122,13 +122,12 @@ public class TransferRtpDataChannelTest {
} }
@Test @Test
public void read_withSmallAndBigBufferWithPartialReadAndSubsequentProducerWrite_reads() public void read_withSmallAndBigBufferWithPartialReadAndSubsequentProducerWrite_reads() {
throws Exception {
byte[] randomBytes1 = buildTestData(40); byte[] randomBytes1 = buildTestData(40);
byte[] randomBytes2 = buildTestData(40); byte[] randomBytes2 = buildTestData(40);
byte[] smallBuffer = new byte[30]; byte[] smallBuffer = new byte[30];
byte[] bigBuffer = new byte[30]; byte[] bigBuffer = new byte[30];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1); transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);
...@@ -152,12 +151,12 @@ public class TransferRtpDataChannelTest { ...@@ -152,12 +151,12 @@ public class TransferRtpDataChannelTest {
} }
@Test @Test
public void read_withSmallAndBigBufferAndSubsequentProducerWrite_reads() throws Exception { public void read_withSmallAndBigBufferAndSubsequentProducerWrite_reads() {
byte[] randomBytes1 = buildTestData(40); byte[] randomBytes1 = buildTestData(40);
byte[] randomBytes2 = buildTestData(40); byte[] randomBytes2 = buildTestData(40);
byte[] smallBuffer = new byte[20]; byte[] smallBuffer = new byte[20];
byte[] bigBuffer = new byte[70]; byte[] bigBuffer = new byte[70];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1); transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);
......
...@@ -18,6 +18,7 @@ package com.google.android.exoplayer2.source.rtsp; ...@@ -18,6 +18,7 @@ package com.google.android.exoplayer2.source.rtsp;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import androidx.test.ext.junit.runners.AndroidJUnit4; import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.upstream.UdpDataSource;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.robolectric.annotation.internal.DoNotInstrument; import org.robolectric.annotation.internal.DoNotInstrument;
...@@ -29,7 +30,8 @@ public class UdpDataSourceRtpDataChannelTest { ...@@ -29,7 +30,8 @@ public class UdpDataSourceRtpDataChannelTest {
@Test @Test
public void getInterleavedBinaryDataListener_returnsNull() { public void getInterleavedBinaryDataListener_returnsNull() {
UdpDataSourceRtpDataChannel udpDataSourceRtpDataChannel = new UdpDataSourceRtpDataChannel(); UdpDataSourceRtpDataChannel udpDataSourceRtpDataChannel =
new UdpDataSourceRtpDataChannel(UdpDataSource.DEFAULT_SOCKET_TIMEOUT_MILLIS);
assertThat(udpDataSourceRtpDataChannel.getInterleavedBinaryDataListener()).isNull(); assertThat(udpDataSourceRtpDataChannel.getInterleavedBinaryDataListener()).isNull();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment