Commit cfb90e4b by claincly Committed by Oliver Woodman

Remove the default RTSP message handling off playback thread.

The callbacks received RTSP messages and RTSP sending errors are now invoked
directly from RtspMessageChannel's internal threads. It's up to the handler
implementation to decide which thread to handle the messages.

#minor-release

PiperOrigin-RevId: 375908282
parent 68b2875a
...@@ -36,6 +36,7 @@ import static com.google.common.base.Strings.nullToEmpty; ...@@ -36,6 +36,7 @@ import static com.google.common.base.Strings.nullToEmpty;
import android.net.Uri; import android.net.Uri;
import android.os.Handler; import android.os.Handler;
import android.os.Looper;
import android.util.SparseArray; import android.util.SparseArray;
import androidx.annotation.Nullable; import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.C;
...@@ -347,8 +348,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -347,8 +348,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private final class MessageListener implements RtspMessageChannel.MessageListener { private final class MessageListener implements RtspMessageChannel.MessageListener {
private final Handler messageHandler;
/**
* Creates a new instance.
*
* <p>The constructor must be called on a {@link Looper} thread, on which all the received RTSP
* messages are processed.
*/
public MessageListener() {
messageHandler = Util.createHandlerForCurrentLooper();
}
@Override @Override
public void onRtspMessageReceived(List<String> message) { public void onRtspMessageReceived(List<String> message) {
messageHandler.post(() -> handleRtspMessage(message));
}
private void handleRtspMessage(List<String> message) {
RtspResponse response = RtspMessageUtil.parseResponse(message); RtspResponse response = RtspMessageUtil.parseResponse(message);
int cSeq = Integer.parseInt(checkNotNull(response.headers.get(RtspHeaders.CSEQ))); int cSeq = Integer.parseInt(checkNotNull(response.headers.get(RtspHeaders.CSEQ)));
...@@ -412,24 +429,17 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -412,24 +429,17 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
onPlayResponseReceived(new RtspPlayResponse(response.status, timing, trackTimingList)); onPlayResponseReceived(new RtspPlayResponse(response.status, timing, trackTimingList));
break; break;
case METHOD_GET_PARAMETER:
onGetParameterResponseReceived(response);
break;
case METHOD_TEARDOWN:
onTeardownResponseReceived(response);
break;
case METHOD_PAUSE: case METHOD_PAUSE:
onPauseResponseReceived(response); onPauseResponseReceived();
break; break;
case METHOD_GET_PARAMETER:
case METHOD_TEARDOWN:
case METHOD_PLAY_NOTIFY: case METHOD_PLAY_NOTIFY:
case METHOD_RECORD: case METHOD_RECORD:
case METHOD_REDIRECT: case METHOD_REDIRECT:
case METHOD_ANNOUNCE: case METHOD_ANNOUNCE:
case METHOD_SET_PARAMETER: case METHOD_SET_PARAMETER:
onUnsupportedResponseReceived(response);
break; break;
case METHOD_UNSET: case METHOD_UNSET:
default: default:
...@@ -442,7 +452,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -442,7 +452,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
// Response handlers must only be called only on 200 (OK) responses. // Response handlers must only be called only on 200 (OK) responses.
public void onOptionsResponseReceived(RtspOptionsResponse response) { private void onOptionsResponseReceived(RtspOptionsResponse response) {
if (keepAliveMonitor != null) { if (keepAliveMonitor != null) {
// Ignores the OPTIONS requests that are sent to keep RTSP connection alive. // Ignores the OPTIONS requests that are sent to keep RTSP connection alive.
return; return;
...@@ -456,7 +466,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -456,7 +466,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
} }
} }
public void onDescribeResponseReceived(RtspDescribeResponse response) { private void onDescribeResponseReceived(RtspDescribeResponse response) {
@Nullable @Nullable
String sessionRangeAttributeString = String sessionRangeAttributeString =
response.sessionDescription.attributes.get(SessionDescription.ATTR_RANGE); response.sessionDescription.attributes.get(SessionDescription.ATTR_RANGE);
...@@ -473,12 +483,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -473,12 +483,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
} }
} }
public void onSetupResponseReceived(RtspSetupResponse response) { private void onSetupResponseReceived(RtspSetupResponse response) {
sessionId = response.sessionHeader.sessionId; sessionId = response.sessionHeader.sessionId;
continueSetupRtspTrack(); continueSetupRtspTrack();
} }
public void onPlayResponseReceived(RtspPlayResponse response) { private void onPlayResponseReceived(RtspPlayResponse response) {
if (keepAliveMonitor == null) { if (keepAliveMonitor == null) {
keepAliveMonitor = new KeepAliveMonitor(DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_MS); keepAliveMonitor = new KeepAliveMonitor(DEFAULT_RTSP_KEEP_ALIVE_INTERVAL_MS);
keepAliveMonitor.start(); keepAliveMonitor.start();
...@@ -490,24 +500,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -490,24 +500,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
pendingSeekPositionUs = C.TIME_UNSET; pendingSeekPositionUs = C.TIME_UNSET;
} }
public void onPauseResponseReceived(RtspResponse response) { private void onPauseResponseReceived() {
if (pendingSeekPositionUs != C.TIME_UNSET) { if (pendingSeekPositionUs != C.TIME_UNSET) {
startPlayback(C.usToMs(pendingSeekPositionUs)); startPlayback(C.usToMs(pendingSeekPositionUs));
} }
} }
public void onGetParameterResponseReceived(RtspResponse response) {
// Do nothing.
}
public void onTeardownResponseReceived(RtspResponse response) {
// Do nothing.
}
public void onUnsupportedResponseReceived(RtspResponse response) {
// Do nothing.
}
private void dispatchRtspError(Throwable error) { private void dispatchRtspError(Throwable error) {
RtspPlaybackException playbackException = RtspPlaybackException playbackException =
error instanceof RtspPlaybackException error instanceof RtspPlaybackException
......
...@@ -21,8 +21,6 @@ import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull; ...@@ -21,8 +21,6 @@ import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;
import android.os.Handler; import android.os.Handler;
import android.os.HandlerThread; import android.os.HandlerThread;
import android.os.Looper;
import android.util.SparseArray;
import androidx.annotation.IntDef; import androidx.annotation.IntDef;
import androidx.annotation.Nullable; import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.C;
...@@ -30,7 +28,6 @@ import com.google.android.exoplayer2.ParserException; ...@@ -30,7 +28,6 @@ import com.google.android.exoplayer2.ParserException;
import com.google.android.exoplayer2.upstream.Loader; import com.google.android.exoplayer2.upstream.Loader;
import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction; import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction;
import com.google.android.exoplayer2.upstream.Loader.Loadable; import com.google.android.exoplayer2.upstream.Loader.Loadable;
import com.google.android.exoplayer2.util.Util;
import com.google.common.base.Ascii; import com.google.common.base.Ascii;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
...@@ -43,7 +40,10 @@ import java.io.OutputStream; ...@@ -43,7 +40,10 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** Sends and receives RTSP messages. */ /** Sends and receives RTSP messages. */
...@@ -98,15 +98,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -98,15 +98,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
*/ */
public static final int DEFAULT_RTSP_PORT = 554; public static final int DEFAULT_RTSP_PORT = 554;
/**
* The handler for all {@code messageListener} interactions. Backed by the thread on which this
* class is constructed.
*/
private final Handler messageListenerHandler;
private final MessageListener messageListener; private final MessageListener messageListener;
private final Loader receiverLoader; private final Loader receiverLoader;
private final SparseArray<InterleavedBinaryDataListener> interleavedBinaryDataListeners; private final Map<Integer, InterleavedBinaryDataListener> interleavedBinaryDataListeners;
private @MonotonicNonNull Sender sender; private @MonotonicNonNull Sender sender;
private @MonotonicNonNull Socket socket; private @MonotonicNonNull Socket socket;
...@@ -115,20 +109,21 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -115,20 +109,21 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** /**
* Constructs a new instance. * Constructs a new instance.
* *
* <p>The constructor must be called on a {@link Looper} thread. The thread is also where {@link * <p>An RTSP {@link Socket socket} must be constructed, and used to call {@link #openSocket} to
* MessageListener} events are sent. User must construct a socket for RTSP and call {@link * open the connection before being able to send and receive. {@link #close} must be called when
* #openSocket} to open the connection before being able to send and receive, and {@link #close} * done.
* it when done. *
* <p>{@link MessageListener} and {@link InterleavedBinaryDataListener} implementations must not
* make assumptions about which thread called their listener methods; and must be thread-safe.
* *
* <p>Note: all method invocations must be made from the thread on which this class is created. * <p>Note: all method invocations must be made from the thread on which this class is created.
* *
* @param messageListener The {@link MessageListener} to receive events. * @param messageListener The {@link MessageListener} to receive events.
*/ */
public RtspMessageChannel(MessageListener messageListener) { public RtspMessageChannel(MessageListener messageListener) {
this.messageListenerHandler = Util.createHandlerForCurrentLooper();
this.messageListener = messageListener; this.messageListener = messageListener;
this.receiverLoader = new Loader("ExoPlayer:RtspMessageChannel:ReceiverLoader"); this.receiverLoader = new Loader("ExoPlayer:RtspMessageChannel:ReceiverLoader");
this.interleavedBinaryDataListeners = new SparseArray<>(); this.interleavedBinaryDataListeners = Collections.synchronizedMap(new HashMap<>());
} }
/** /**
...@@ -169,7 +164,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -169,7 +164,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
sender.close(); sender.close();
} }
receiverLoader.release(); receiverLoader.release();
messageListenerHandler.removeCallbacksAndMessages(/* token= */ null);
if (socket != null) { if (socket != null) {
socket.close(); socket.close();
...@@ -193,9 +187,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -193,9 +187,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
* Registers an {@link InterleavedBinaryDataListener} to receive RTSP interleaved data. * Registers an {@link InterleavedBinaryDataListener} to receive RTSP interleaved data.
* *
* <p>The listener method {@link InterleavedBinaryDataListener#onInterleavedBinaryDataReceived} is * <p>The listener method {@link InterleavedBinaryDataListener#onInterleavedBinaryDataReceived} is
* called on {@link RtspMessageChannel}'s internal thread for receiving RTSP messages. The user * called on {@link RtspMessageChannel}'s internal thread for receiving RTSP messages.
* should post the handling for the interleaved data onto another thread, if the handling is
* performance intensive.
*/ */
public void registerInterleavedBinaryDataListener( public void registerInterleavedBinaryDataListener(
int channel, InterleavedBinaryDataListener listener) { int channel, InterleavedBinaryDataListener listener) {
...@@ -237,12 +229,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -237,12 +229,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
try { try {
outputStream.write(data); outputStream.write(data);
} catch (Exception e) { } catch (Exception e) {
messageListenerHandler.post( if (!closed) {
() -> { messageListener.onSendingFailed(message, e);
if (!closed) { }
messageListener.onSendingFailed(message, e);
}
});
} }
}); });
} }
...@@ -307,13 +296,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -307,13 +296,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte())); messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte()));
} }
ImmutableList<String> messageLinesToPost = ImmutableList.copyOf(messageLines); if (!closed) {
messageListenerHandler.post( messageListener.onRtspMessageReceived(messageLines);
() -> { }
if (!closed) {
messageListener.onRtspMessageReceived(messageLinesToPost);
}
});
} }
/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */ /** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
...@@ -364,7 +349,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; ...@@ -364,7 +349,9 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
long loadDurationMs, long loadDurationMs,
IOException error, IOException error,
int errorCount) { int errorCount) {
messageListener.onReceivingFailed(error); if (!closed) {
messageListener.onReceivingFailed(error);
}
return Loader.DONT_RETRY; return Loader.DONT_RETRY;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment