Commit 0ea6c899 by claincly Committed by Oliver Woodman

Enable reading interleaved message in RtspMessageChannel.

RTSP interleaving enables RTP packets to be sent using RTSP's TCP connection.
The interleaving RTSP messages contain binary data only and always start with a
'$'. Normal RTSP messages contain line breaks (CRLFs) that indicate complete
lines.

#minor-release

PiperOrigin-RevId: 372990181
parent 923ba513
......@@ -15,23 +15,31 @@
*/
package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.isRtspStartLine;
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import androidx.annotation.IntDef;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.ParserException;
import com.google.android.exoplayer2.upstream.Loader;
import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction;
import com.google.android.exoplayer2.upstream.Loader.Loadable;
import com.google.android.exoplayer2.util.Log;
import com.google.android.exoplayer2.util.Util;
import com.google.common.base.Ascii;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import java.io.BufferedReader;
import com.google.common.collect.ImmutableList;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
......@@ -42,7 +50,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/* package */ final class RtspMessageChannel implements Closeable {
private static final String TAG = "RtspMessageChannel";
private static final boolean LOG_RTSP_MESSAGES = false;
private static final boolean LOG_RTSP_MESSAGES = true;
/** A listener for received RTSP messages and possible failures. */
public interface MessageListener {
......@@ -55,6 +63,15 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
void onRtspMessageReceived(List<String> message);
/**
* Called when interleaved binary data is received on RTSP.
*
* @param data The received binary data. The byte array will not be reused by {@link
* RtspMessageChannel}, and will always be full.
* @param channel The channel on which the data is received.
*/
default void onInterleavedBinaryDataReceived(byte[] data, int channel) {}
/**
* Called when failed to send an RTSP message.
*
* @param message The list of lines making up the RTSP message that is failed to send.
......@@ -87,7 +104,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private @MonotonicNonNull Sender sender;
private @MonotonicNonNull Socket socket;
private boolean closed;
private volatile boolean closed;
/**
* Constructs a new instance.
......@@ -135,17 +152,24 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
*/
@Override
public void close() throws IOException {
if (sender != null) {
sender.close();
// TODO(internal b/172331505) Make sure most resources are closed before throwing, and close()
// can be called again to close the resources that are still open.
if (closed) {
return;
}
receiverLoader.release();
try {
if (sender != null) {
sender.close();
}
receiverLoader.release();
messageListenerHandler.removeCallbacksAndMessages(/* token= */ null);
if (socket != null) {
socket.close();
if (socket != null) {
socket.close();
}
} finally {
closed = true;
}
messageListenerHandler.removeCallbacksAndMessages(/* token= */ null);
closed = true;
}
/**
......@@ -159,6 +183,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
private static void logMessage(List<String> rtspMessage) {
// TODO(b/172331505) Remove before release.
if (LOG_RTSP_MESSAGES) {
Log.d(TAG, Joiner.on('\n').join(rtspMessage));
}
......@@ -224,8 +249,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/** A {@link Loadable} for receiving RTSP responses. */
private final class Receiver implements Loadable {
private final BufferedReader inputStreamReader;
/** ASCII dollar encapsulates the RTP packets in interleaved mode (RFC2326 Section 10.12). */
private static final byte RTSP_INTERLEAVED_MESSAGE_MARKER = '$';
private final DataInputStream dataInputStream;
private final RtspMessageBuilder messageBuilder;
private volatile boolean loadCanceled;
/**
......@@ -236,7 +265,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
* InputStream}.
*/
public Receiver(InputStream inputStream) {
inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, Charsets.UTF_8));
dataInputStream = new DataInputStream(inputStream);
messageBuilder = new RtspMessageBuilder();
}
@Override
......@@ -246,26 +276,66 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Override
public void load() throws IOException {
List<String> messageLines = new ArrayList<>();
while (!loadCanceled) {
String line;
while (inputStreamReader.ready() && (line = inputStreamReader.readLine()) != null) {
messageLines.add(line);
// TODO(internal b/172331505) Use a buffered read.
byte firstByte = dataInputStream.readByte();
if (firstByte == RTSP_INTERLEAVED_MESSAGE_MARKER) {
handleInterleavedBinaryData();
} else {
handleRtspMessage(firstByte);
}
}
}
if (!messageLines.isEmpty()) {
List<String> message = new ArrayList<>(messageLines);
logMessage(message);
messageListenerHandler.post(
() -> {
if (!closed) {
messageListener.onRtspMessageReceived(message);
}
});
// Resets for the next response.
messageLines.clear();
}
/** Handles an entire RTSP message. */
private void handleRtspMessage(byte firstByte) throws IOException {
@Nullable
ImmutableList<String> messageLines = messageBuilder.addLine(handleRtspMessageLine(firstByte));
while (messageLines == null) {
messageLines = messageBuilder.addLine(handleRtspMessageLine(dataInputStream.readByte()));
}
logMessage(messageLines);
ImmutableList<String> messageLinesToPost = ImmutableList.copyOf(messageLines);
messageListenerHandler.post(
() -> {
if (!closed) {
messageListener.onRtspMessageReceived(messageLinesToPost);
}
});
}
/** Returns the byte representation of a complete RTSP line, with CRLF line terminator. */
private byte[] handleRtspMessageLine(byte firstByte) throws IOException {
ByteArrayOutputStream messageByteStream = new ByteArrayOutputStream();
byte[] peekedBytes = new byte[2];
peekedBytes[0] = firstByte;
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes);
while (peekedBytes[0] != Ascii.CR || peekedBytes[1] != Ascii.LF) {
// Shift the CRLF buffer.
peekedBytes[0] = peekedBytes[1];
peekedBytes[1] = dataInputStream.readByte();
messageByteStream.write(peekedBytes[1]);
}
return messageByteStream.toByteArray();
}
private void handleInterleavedBinaryData() throws IOException {
int channel = dataInputStream.readUnsignedByte();
int size = dataInputStream.readUnsignedShort();
byte[] data = new byte[size];
dataInputStream.readFully(data, /* off= */ 0, size);
messageListenerHandler.post(
() -> {
if (!closed) {
messageListener.onInterleavedBinaryDataReceived(data, channel);
}
});
}
}
......@@ -288,4 +358,93 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
return Loader.DONT_RETRY;
}
}
/** Processes RTSP messages line-by-line. */
private static final class RtspMessageBuilder {
@IntDef({STATE_READING_FIRST_LINE, STATE_READING_RTSP_HEADER, STATE_READING_RTSP_BODY})
@interface ReadingState {}
private static final int STATE_READING_FIRST_LINE = 1;
private static final int STATE_READING_RTSP_HEADER = 2;
private static final int STATE_READING_RTSP_BODY = 3;
private final List<String> messageLines;
@ReadingState private int state;
private long messageBodyLength;
private long receivedMessageBodyLength;
/** Creates a new instance. */
public RtspMessageBuilder() {
messageLines = new ArrayList<>();
state = STATE_READING_FIRST_LINE;
}
/**
* Add a line to the builder.
*
* @param lineBytes The complete RTSP message line in UTF-8 byte array, including CRLF.
* @return A list of completed RTSP message lines, without the CRLF line terminators; or {@code
* null} if the message is not yet complete.
*/
@Nullable
public ImmutableList<String> addLine(byte[] lineBytes) throws ParserException {
// Trim CRLF.
checkArgument(
lineBytes.length >= 2
&& lineBytes[lineBytes.length - 2] == Ascii.CR
&& lineBytes[lineBytes.length - 1] == Ascii.LF);
String line =
new String(
lineBytes, /* offset= */ 0, /* length= */ lineBytes.length - 2, Charsets.UTF_8);
messageLines.add(line);
switch (state) {
case STATE_READING_FIRST_LINE:
if (isRtspStartLine(line)) {
state = STATE_READING_RTSP_HEADER;
}
break;
case STATE_READING_RTSP_HEADER:
// Check if the line contains RTSP Content-Length header.
long contentLength = RtspMessageUtil.parseContentLengthHeader(line);
if (contentLength != C.LENGTH_UNSET) {
messageBodyLength = contentLength;
}
if (line.isEmpty()) {
// An empty line signals the end of the header section.
if (messageBodyLength > 0) {
state = STATE_READING_RTSP_BODY;
} else {
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
return linesToReturn;
}
}
break;
case STATE_READING_RTSP_BODY:
receivedMessageBodyLength += lineBytes.length;
if (receivedMessageBodyLength >= messageBodyLength) {
ImmutableList<String> linesToReturn = ImmutableList.copyOf(messageLines);
reset();
return linesToReturn;
}
break;
default:
throw new IllegalStateException();
}
return null;
}
private void reset() {
messageLines.clear();
state = STATE_READING_FIRST_LINE;
messageBodyLength = 0;
receivedMessageBodyLength = 0;
}
}
}
......@@ -30,6 +30,7 @@ import static com.google.android.exoplayer2.source.rtsp.RtspRequest.METHOD_TEARD
import static com.google.android.exoplayer2.source.rtsp.RtspRequest.METHOD_UNSET;
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkNotNull;
import static java.util.regex.Pattern.CASE_INSENSITIVE;
import android.net.Uri;
import androidx.annotation.Nullable;
......@@ -72,6 +73,10 @@ import java.util.regex.Pattern;
// Status line pattern, see RFC2326 Section 7.1.
private static final Pattern STATUS_LINE_PATTERN = Pattern.compile("RTSP/1\\.0 (\\d+) (.+)");
// Content length header pattern, see RFC2326 Section 12.14.
private static final Pattern CONTENT_LENGTH_HEADER_PATTERN =
Pattern.compile("Content-Length:\\s?(\\d+)", CASE_INSENSITIVE);
// Session header pattern, see RFC2326 Section 12.37.
private static final Pattern SESSION_HEADER_PATTERN =
Pattern.compile("(\\w+)(?:;\\s?timeout=(\\d+))?");
......@@ -260,6 +265,31 @@ import java.util.regex.Pattern;
return new RtspRequest(requestUri, method, headers, messageBody);
}
/** Returns whether the line is a valid RTSP start line. */
public static boolean isRtspStartLine(String line) {
return REQUEST_LINE_PATTERN.matcher(line).matches()
|| STATUS_LINE_PATTERN.matcher(line).matches();
}
/**
* Returns the length in bytes if the line contains a Content-Length header, otherwise {@link
* C#LENGTH_UNSET}.
*
* @throws ParserException If Content-Length cannot be parsed to an integer.
*/
public static long parseContentLengthHeader(String line) throws ParserException {
try {
Matcher matcher = CONTENT_LENGTH_HEADER_PATTERN.matcher(line);
if (matcher.find()) {
return Long.parseLong(checkNotNull(matcher.group(1)));
} else {
return C.LENGTH_UNSET;
}
} catch (NumberFormatException e) {
throw new ParserException(e);
}
}
/**
* Parses the RTSP PUBLIC header into a list of RTSP methods.
*
......
/*
* Copyright 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.convertMessageToByteArray;
import static com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.serializeResponse;
import static com.google.common.truth.Truth.assertThat;
import android.net.Uri;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.robolectric.RobolectricUtil;
import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.MessageListener;
import com.google.android.exoplayer2.util.Util;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.primitives.Bytes;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
/** Unit test for {@link RtspMessageChannel}. */
@RunWith(AndroidJUnit4.class)
public final class RtspMessageChannelTest {
@Test
public void rtspMessageChannelReceive_threeRtspMessagesAndTwoInterleavedBinary_postsToListener()
throws Exception {
RtspResponse optionsResponse =
new RtspResponse(
200,
new RtspHeaders.Builder()
.add(RtspHeaders.CSEQ, "2")
.add(RtspHeaders.PUBLIC, "OPTIONS")
.build(),
"");
RtspResponse describeResponse =
new RtspResponse(
200,
new RtspHeaders.Builder()
.add(RtspHeaders.CSEQ, "3")
.add(RtspHeaders.CONTENT_TYPE, "application/sdp")
.add(RtspHeaders.CONTENT_LENGTH, "28")
.build(),
"v=安卓アンドロイド\r\n");
RtspResponse setupResponse =
new RtspResponse(
200,
new RtspHeaders.Builder()
.add(RtspHeaders.CSEQ, "3")
.add(RtspHeaders.TRANSPORT, "RTP/AVP/TCP;unicast;interleaved=0-1")
.build(),
"");
// Channel: 0, size: 5, data: 01 02 03 04 05.
byte[] interleavedData1 = Util.getBytesFromHexString("0000050102030405");
// Channel: 1, size: 4, data: AA BB CC DD.
byte[] interleavedData2 = Util.getBytesFromHexString("010004AABBCCDD");
AtomicBoolean receivingFinished = new AtomicBoolean();
AtomicReference<Exception> sendingException = new AtomicReference<>();
List<List<String>> receivedRtspResponses = new ArrayList<>(/* initialCapacity= */ 3);
Multimap<Integer, List<Byte>> receivedInterleavedData = LinkedListMultimap.create();
ServerSocket serverSocket =
new ServerSocket(/* port= */ 0, /* backlog= */ 1, InetAddress.getByName(/* host= */ null));
Thread serverListenThread =
new Thread(
() -> {
try {
Socket socket = serverSocket.accept();
OutputStream serverOutputStream = socket.getOutputStream();
serverOutputStream.write(
convertMessageToByteArray(serializeResponse(optionsResponse)));
serverOutputStream.write(
convertMessageToByteArray(serializeResponse(describeResponse)));
serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData1));
serverOutputStream.write(Bytes.concat(new byte[] {'$'}, interleavedData2));
serverOutputStream.write(
convertMessageToByteArray(serializeResponse(setupResponse)));
} catch (IOException e) {
sendingException.set(e);
}
},
"RtspMessageChannelTest:ServerListenThread");
serverListenThread.start();
int serverRtspPortNumber = serverSocket.getLocalPort();
Uri connectionUri =
Uri.parse(Util.formatInvariant("rtsp://localhost:%d/test", serverRtspPortNumber));
Socket clientSideSocket =
SocketFactory.getDefault().createSocket(connectionUri.getHost(), connectionUri.getPort());
RtspMessageChannel rtspMessageChannel =
new RtspMessageChannel(
new MessageListener() {
@Override
public void onRtspMessageReceived(List<String> message) {
receivedRtspResponses.add(message);
if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) {
receivingFinished.set(true);
}
}
@Override
public void onInterleavedBinaryDataReceived(byte[] data, int channel) {
receivedInterleavedData.put(channel, Bytes.asList(data));
}
});
rtspMessageChannel.openSocket(clientSideSocket);
RobolectricUtil.runMainLooperUntil(receivingFinished::get);
Util.closeQuietly(rtspMessageChannel);
serverListenThread.join();
serverSocket.close();
assertThat(sendingException.get()).isNull();
assertThat(receivedRtspResponses)
.containsExactly(
/* optionsResponse */
ImmutableList.of("RTSP/1.0 200 OK", "CSeq: 2", "Public: OPTIONS", ""),
/* describeResponse */
ImmutableList.of(
"RTSP/1.0 200 OK",
"CSeq: 3",
"Content-Type: application/sdp",
"Content-Length: 28",
"",
"v=安卓アンドロイド"),
/* setupResponse */
ImmutableList.of(
"RTSP/1.0 200 OK", "CSeq: 3", "Transport: RTP/AVP/TCP;unicast;interleaved=0-1", ""))
.inOrder();
assertThat(receivedInterleavedData)
.containsExactly(
/* channel */ 0,
Bytes.asList(Util.getBytesFromHexString("0102030405")),
/* channel */ 1,
Bytes.asList(Util.getBytesFromHexString("AABBCCDD")));
}
}
......@@ -363,4 +363,28 @@ public final class RtspMessageUtilTest {
assertThat(RtspMessageUtil.removeUserInfo(uri))
.isEqualTo(Uri.parse("rtsp://foo.bar:5050/foo.mkv"));
}
@Test
public void parseContentLengthHeader_withContentLengthOver31Bits_succeeds() throws Exception {
String line = "Content-Length: 1000000000000000";
long contentLength = RtspMessageUtil.parseContentLengthHeader(line);
assertThat(contentLength).isEqualTo(1000000000000000L);
}
@Test
public void isRtspStartLine_onValidRequestLine_succeeds() {
assertThat(RtspMessageUtil.isRtspStartLine("OPTIONS rtsp://localhost/test RTSP/1.0")).isTrue();
}
@Test
public void isRtspStartLine_onValidResponseLine_succeeds() {
assertThat(RtspMessageUtil.isRtspStartLine("RTSP/1.0 456 Header Field Not Valid for Resource"))
.isTrue();
}
@Test
public void isRtspStartLine_onValidHeaderLine_succeeds() {
assertThat(RtspMessageUtil.isRtspStartLine("Transport: RTP/AVP;unicast;client_port=1000-1001"))
.isFalse();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment