Commit 68b2875a by claincly Committed by Oliver Woodman

Move RTP packet forwarding off playback thread.

Previously, RTSP interleaved binary data is posted onto the playback thread
for handling, the playback thread then adds the received data to a queue.
A loader thread will later dequeue the data and process it.

In this CL, the binary data is sent through a separate listener, on
RtspMessageChannel's RTSP receiving thread.

#minor-release

PiperOrigin-RevId: 375907609
parent 4a780fb9
......@@ -15,7 +15,9 @@
*/
package com.google.android.exoplayer2.source.rtsp;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.InterleavedBinaryDataListener;
import com.google.android.exoplayer2.upstream.DataSource;
import java.io.IOException;
......@@ -43,17 +45,9 @@ import java.io.IOException;
int getLocalPort();
/**
* Returns whether the data channel is using sideband binary data to transmit RTP packets. For
* example, RTP-over-RTSP.
* Returns a {@link InterleavedBinaryDataListener} if the implementation supports receiving RTP
* packets on a side-band protocol, for example RTP-over-RTSP; otherwise {@code null}.
*/
boolean usesSidebandBinaryData();
/**
* Writes data to the channel.
*
* <p>The channel owns the written buffer, the user must not alter its content after writing.
*
* @param buffer The buffer from which data should be written. The buffer should be full.
*/
void write(byte[] buffer);
@Nullable
InterleavedBinaryDataListener getInterleavedBinaryDataListener();
}
......@@ -42,6 +42,7 @@ import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.ParserException;
import com.google.android.exoplayer2.source.rtsp.RtspMediaPeriod.RtpLoadInfo;
import com.google.android.exoplayer2.source.rtsp.RtspMediaSource.RtspPlaybackException;
import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.InterleavedBinaryDataListener;
import com.google.android.exoplayer2.source.rtsp.RtspMessageUtil.RtspSessionHeader;
import com.google.android.exoplayer2.util.Util;
import com.google.common.collect.ImmutableList;
......@@ -95,7 +96,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
// TODO(b/172331505) Add a timeout monitor for pending requests.
private final SparseArray<RtspRequest> pendingRequests;
private final MessageSender messageSender;
private final SparseArray<RtpDataChannel> transferRtpDataChannelMap;
private RtspMessageChannel messageChannel;
private @MonotonicNonNull PlaybackEventListener playbackEventListener;
......@@ -125,7 +125,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
pendingSetupRtpLoadInfos = new ArrayDeque<>();
pendingRequests = new SparseArray<>();
messageSender = new MessageSender();
transferRtpDataChannelMap = new SparseArray<>();
pendingSeekPositionUs = C.TIME_UNSET;
messageChannel = new RtspMessageChannel(new MessageListener());
}
......@@ -224,9 +223,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
}
/** Registers an {@link RtpDataChannel} to receive RTSP interleaved data. */
public void registerInterleavedDataChannel(RtpDataChannel rtpDataChannel) {
transferRtpDataChannelMap.put(rtpDataChannel.getLocalPort(), rtpDataChannel);
/** Registers an {@link InterleavedBinaryDataListener} to receive RTSP interleaved data. */
public void registerInterleavedDataChannel(
int channel, InterleavedBinaryDataListener interleavedBinaryDataListener) {
messageChannel.registerInterleavedBinaryDataListener(channel, interleavedBinaryDataListener);
}
private void continueSetupRtspTrack() {
......@@ -440,14 +440,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
}
@Override
public void onInterleavedBinaryDataReceived(byte[] data, int channel) {
@Nullable RtpDataChannel dataChannel = transferRtpDataChannelMap.get(channel);
if (dataChannel != null) {
dataChannel.write(data);
}
}
// Response handlers must only be called only on 200 (OK) responses.
public void onOptionsResponseReceived(RtspOptionsResponse response) {
......
......@@ -689,8 +689,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
(transport, rtpDataChannel) -> {
RtpLoadInfo.this.transport = transport;
if (rtpDataChannel.usesSidebandBinaryData()) {
rtspClient.registerInterleavedDataChannel(rtpDataChannel);
@Nullable
RtspMessageChannel.InterleavedBinaryDataListener interleavedBinaryDataListener =
rtpDataChannel.getInterleavedBinaryDataListener();
if (interleavedBinaryDataListener != null) {
rtspClient.registerInterleavedDataChannel(
rtpDataChannel.getLocalPort(), interleavedBinaryDataListener);
}
maybeSetupTracks();
......
......@@ -22,6 +22,7 @@ import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import android.util.SparseArray;
import androidx.annotation.IntDef;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
......@@ -64,15 +65,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
void onRtspMessageReceived(List<String> message);
/**
* Called when interleaved binary data is received on RTSP.
*
* @param data The received binary data. The byte array will not be reused by {@link
* RtspMessageChannel}, and will always be full.
* @param channel The channel on which the data is received.
*/
default void onInterleavedBinaryDataReceived(byte[] data, int channel) {}
/**
* Called when failed to send an RTSP message.
*
* @param message The list of lines making up the RTSP message that is failed to send.
......@@ -88,6 +80,18 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
default void onReceivingFailed(Exception e) {}
}
/** A listener for received interleaved binary data from RTSP. */
public interface InterleavedBinaryDataListener {
/**
* Called when interleaved binary data is received on RTSP.
*
* @param data The received binary data. The byte array will not be reused by {@link
* RtspMessageChannel}, and will always be full.
*/
void onInterleavedBinaryDataReceived(byte[] data);
}
/**
* The IANA-registered default port for RTSP. See <a
* href="https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml">here</a>
......@@ -102,6 +106,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private final MessageListener messageListener;
private final Loader receiverLoader;
private final SparseArray<InterleavedBinaryDataListener> interleavedBinaryDataListeners;
private @MonotonicNonNull Sender sender;
private @MonotonicNonNull Socket socket;
......@@ -123,6 +128,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
this.messageListenerHandler = Util.createHandlerForCurrentLooper();
this.messageListener = messageListener;
this.receiverLoader = new Loader("ExoPlayer:RtspMessageChannel:ReceiverLoader");
this.interleavedBinaryDataListeners = new SparseArray<>();
}
/**
......@@ -183,6 +189,19 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
sender.send(message);
}
/**
* Registers an {@link InterleavedBinaryDataListener} to receive RTSP interleaved data.
*
* <p>The listener method {@link InterleavedBinaryDataListener#onInterleavedBinaryDataReceived} is
* called on {@link RtspMessageChannel}'s internal thread for receiving RTSP messages. The user
* should post the handling for the interleaved data onto another thread, if the handling is
* performance intensive.
*/
public void registerInterleavedBinaryDataListener(
int channel, InterleavedBinaryDataListener listener) {
interleavedBinaryDataListeners.put(channel, listener);
}
private final class Sender implements Closeable {
private final OutputStream outputStream;
......@@ -322,12 +341,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
byte[] data = new byte[size];
dataInputStream.readFully(data, /* off= */ 0, size);
messageListenerHandler.post(
() -> {
if (!closed) {
messageListener.onInterleavedBinaryDataReceived(data, channel);
}
});
@Nullable
InterleavedBinaryDataListener listener = interleavedBinaryDataListeners.get(channel);
if (listener != null && !closed) {
listener.onInterleavedBinaryDataReceived(data);
}
}
}
......
......@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import android.net.Uri;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.InterleavedBinaryDataListener;
import com.google.android.exoplayer2.upstream.BaseDataSource;
import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.util.Util;
......@@ -31,7 +32,8 @@ import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
/** An {@link RtpDataChannel} that transfers received data in-memory. */
/* package */ final class TransferRtpDataChannel extends BaseDataSource implements RtpDataChannel {
/* package */ final class TransferRtpDataChannel extends BaseDataSource
implements RtpDataChannel, RtspMessageChannel.InterleavedBinaryDataListener {
private static final String DEFAULT_TCP_TRANSPORT_FORMAT =
"RTP/AVP/TCP;unicast;interleaved=%d-%d";
......@@ -62,8 +64,8 @@ import java.util.concurrent.LinkedBlockingQueue;
}
@Override
public boolean usesSidebandBinaryData() {
return true;
public InterleavedBinaryDataListener getInterleavedBinaryDataListener() {
return this;
}
@Override
......@@ -119,7 +121,7 @@ import java.util.concurrent.LinkedBlockingQueue;
}
@Override
public void write(byte[] buffer) {
packetQueue.add(buffer);
public void onInterleavedBinaryDataReceived(byte[] data) {
packetQueue.add(data);
}
}
......@@ -55,6 +55,12 @@ import java.io.IOException;
return port == UdpDataSource.UDP_PORT_UNSET ? C.INDEX_UNSET : port;
}
@Nullable
@Override
public RtspMessageChannel.InterleavedBinaryDataListener getInterleavedBinaryDataListener() {
return null;
}
@Override
public void addTransferListener(TransferListener transferListener) {
dataSource.addTransferListener(transferListener);
......@@ -85,20 +91,6 @@ import java.io.IOException;
return dataSource.read(target, offset, length);
}
@Override
public boolean usesSidebandBinaryData() {
return false;
}
/**
* Writing to a {@link UdpDataSource} backed {@link RtpDataChannel} is not supported at the
* moment.
*/
@Override
public void write(byte[] buffer) {
throw new UnsupportedOperationException();
}
public void setRtcpChannel(UdpDataSourceRtpDataChannel rtcpChannel) {
checkArgument(this != rtcpChannel);
this.rtcpChannel = rtcpChannel;
......
......@@ -22,7 +22,6 @@ import static com.google.common.truth.Truth.assertThat;
import android.net.Uri;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.robolectric.RobolectricUtil;
import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.MessageListener;
import com.google.android.exoplayer2.util.Util;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.LinkedListMultimap;
......@@ -84,6 +83,7 @@ public final class RtspMessageChannelTest {
AtomicBoolean receivingFinished = new AtomicBoolean();
AtomicReference<Exception> sendingException = new AtomicReference<>();
List<List<String>> receivedRtspResponses = new ArrayList<>(/* initialCapacity= */ 3);
// Key: channel number, Value: a list of received byte arrays.
Multimap<Integer, List<Byte>> receivedInterleavedData = LinkedListMultimap.create();
ServerSocket serverSocket =
new ServerSocket(/* port= */ 0, /* backlog= */ 1, InetAddress.getByName(/* host= */ null));
......@@ -116,20 +116,18 @@ public final class RtspMessageChannelTest {
RtspMessageChannel rtspMessageChannel =
new RtspMessageChannel(
new MessageListener() {
@Override
public void onRtspMessageReceived(List<String> message) {
receivedRtspResponses.add(message);
if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) {
receivingFinished.set(true);
}
}
@Override
public void onInterleavedBinaryDataReceived(byte[] data, int channel) {
receivedInterleavedData.put(channel, Bytes.asList(data));
message -> {
receivedRtspResponses.add(message);
if (receivedRtspResponses.size() == 3 && receivedInterleavedData.size() == 2) {
receivingFinished.set(true);
}
});
rtspMessageChannel.registerInterleavedBinaryDataListener(
/* channel= */ 0, data -> receivedInterleavedData.put(0, Bytes.asList(data)));
rtspMessageChannel.registerInterleavedBinaryDataListener(
/* channel= */ 1, data -> receivedInterleavedData.put(1, Bytes.asList(data)));
rtspMessageChannel.openSocket(clientSideSocket);
RobolectricUtil.runMainLooperUntil(receivingFinished::get);
......
......@@ -17,9 +17,11 @@ package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.testutil.TestUtil.buildTestData;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.primitives.Bytes;
import java.io.IOException;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -29,11 +31,29 @@ import org.junit.runner.RunWith;
public class TransferRtpDataChannelTest {
@Test
public void getInterleavedBinaryDataListener_returnsAnInterleavedBinaryDataListener() {
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
assertThat(transferRtpDataChannel.getInterleavedBinaryDataListener())
.isEqualTo(transferRtpDataChannel);
}
@Test
public void read_withoutReceivingInterleavedData_timesOut() {
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
byte[] buffer = new byte[1];
assertThrows(
IOException.class,
() -> transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length));
}
@Test
public void read_withLargeEnoughBuffer_reads() throws Exception {
byte[] randomBytes = buildTestData(20);
byte[] buffer = new byte[40];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
transferRtpDataChannel.write(randomBytes);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
......@@ -45,7 +65,7 @@ public class TransferRtpDataChannelTest {
byte[] randomBytes = buildTestData(20);
byte[] buffer = new byte[8];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
transferRtpDataChannel.write(randomBytes);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 0, /* to= */ 8));
......@@ -61,7 +81,7 @@ public class TransferRtpDataChannelTest {
byte[] randomBytes = buildTestData(40);
byte[] buffer = new byte[20];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
transferRtpDataChannel.write(randomBytes);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 0, /* to= */ 20));
......@@ -77,13 +97,13 @@ public class TransferRtpDataChannelTest {
byte[] smallBuffer = new byte[20];
byte[] bigBuffer = new byte[40];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
transferRtpDataChannel.write(randomBytes1);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);
assertThat(smallBuffer)
.isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 20));
transferRtpDataChannel.write(randomBytes2);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes2);
// Read the remaining 20 bytes in randomBytes1, and 20 bytes from randomBytes2.
transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length);
......@@ -107,13 +127,13 @@ public class TransferRtpDataChannelTest {
byte[] smallBuffer = new byte[30];
byte[] bigBuffer = new byte[30];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
transferRtpDataChannel.write(randomBytes1);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);
assertThat(smallBuffer)
.isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 30));
transferRtpDataChannel.write(randomBytes2);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes2);
// Read 30 bytes to big buffer.
transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length);
......@@ -136,13 +156,13 @@ public class TransferRtpDataChannelTest {
byte[] smallBuffer = new byte[20];
byte[] bigBuffer = new byte[70];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
transferRtpDataChannel.write(randomBytes1);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);
assertThat(smallBuffer)
.isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 20));
transferRtpDataChannel.write(randomBytes2);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes2);
transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length);
assertThat(Arrays.copyOfRange(bigBuffer, /* from= */ 0, /* to= */ 60))
......
/*
* Copyright 2021 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.exoplayer2.source.rtsp;
import static com.google.common.truth.Truth.assertThat;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import org.junit.Test;
import org.junit.runner.RunWith;
/** Unit test for {@link UdpDataSourceRtpDataChannel}. */
@RunWith(AndroidJUnit4.class)
public class UdpDataSourceRtpDataChannelTest {
@Test
public void getInterleavedBinaryDataListener_returnsNull() {
UdpDataSourceRtpDataChannel udpDataSourceRtpDataChannel = new UdpDataSourceRtpDataChannel();
assertThat(udpDataSourceRtpDataChannel.getInterleavedBinaryDataListener()).isNull();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment