Commit b75b4213 by kimvde Committed by microkatz

Refactor progress logic to be thread safe

PiperOrigin-RevId: 489984147
parent ca190c08
......@@ -60,6 +60,7 @@ import org.checkerframework.dataflow.qual.Pure;
Codec.DecoderFactory decoderFactory,
Codec.EncoderFactory encoderFactory,
MuxerWrapper muxerWrapper,
Listener listener,
FallbackListener fallbackListener)
throws TransformationException {
super(
......@@ -67,7 +68,8 @@ import org.checkerframework.dataflow.qual.Pure;
streamStartPositionUs,
streamOffsetUs,
transformationRequest.flattenForSlowMotion,
muxerWrapper);
muxerWrapper,
listener);
decoderInputBuffer = new DecoderInputBuffer(BUFFER_REPLACEMENT_MODE_DISABLED);
encoderInputBuffer = new DecoderInputBuffer(BUFFER_REPLACEMENT_MODE_DISABLED);
......
......@@ -18,14 +18,12 @@ package com.google.android.exoplayer2.transformer;
import static com.google.android.exoplayer2.util.Assertions.checkNotNull;
import static com.google.android.exoplayer2.util.Assertions.checkStateNotNull;
import static java.lang.Math.max;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.Format;
import com.google.android.exoplayer2.decoder.DecoderInputBuffer;
import com.google.android.exoplayer2.util.MimeTypes;
import com.google.android.exoplayer2.util.Util;
import java.nio.ByteBuffer;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;
......@@ -35,12 +33,12 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
private final long streamStartPositionUs;
private final long streamOffsetUs;
private final MuxerWrapper muxerWrapper;
private final Listener listener;
private final @C.TrackType int trackType;
private final @MonotonicNonNull SefSlowMotionFlattener sefVideoSlowMotionFlattener;
@Nullable private DecoderInputBuffer inputBuffer;
private boolean muxerWrapperTrackAdded;
private long currentPositionMs;
private boolean isEnded;
public BaseSamplePipeline(
......@@ -48,10 +46,12 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
long streamStartPositionUs,
long streamOffsetUs,
boolean flattenForSlowMotion,
MuxerWrapper muxerWrapper) {
MuxerWrapper muxerWrapper,
Listener listener) {
this.streamStartPositionUs = streamStartPositionUs;
this.streamOffsetUs = streamOffsetUs;
this.muxerWrapper = muxerWrapper;
this.listener = listener;
trackType = MimeTypes.getTrackType(inputFormat.sampleMimeType);
sefVideoSlowMotionFlattener =
flattenForSlowMotion && trackType == C.TRACK_TYPE_VIDEO
......@@ -69,8 +69,7 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
@Override
public void queueInputBuffer() throws TransformationException {
DecoderInputBuffer inputBuffer = checkNotNull(this.inputBuffer);
currentPositionMs =
max(currentPositionMs, Util.usToMs(inputBuffer.timeUs - streamStartPositionUs));
listener.onInputBufferQueued(inputBuffer.timeUs - streamStartPositionUs);
checkNotNull(inputBuffer.data);
if (!shouldDropInputBuffer(inputBuffer)) {
queueInputBufferInternal();
......@@ -87,11 +86,6 @@ import org.checkerframework.checker.nullness.qual.RequiresNonNull;
return isEnded;
}
@Override
public long getCurrentPositionMs() {
return currentPositionMs;
}
@Nullable
protected abstract DecoderInputBuffer dequeueInputBufferInternal() throws TransformationException;
......
......@@ -34,13 +34,15 @@ import com.google.android.exoplayer2.decoder.DecoderInputBuffer;
long streamOffsetUs,
TransformationRequest transformationRequest,
MuxerWrapper muxerWrapper,
Listener listener,
FallbackListener fallbackListener) {
super(
format,
streamStartPositionUs,
streamOffsetUs,
transformationRequest.flattenForSlowMotion,
muxerWrapper);
muxerWrapper,
listener);
this.format = format;
buffer = new DecoderInputBuffer(DecoderInputBuffer.BUFFER_REPLACEMENT_MODE_DIRECT);
fallbackListener.onTransformationRequestFinalized(transformationRequest);
......
......@@ -26,6 +26,25 @@ import com.google.android.exoplayer2.decoder.DecoderInputBuffer;
*/
/* package */ interface SamplePipeline {
/** A listener for the sample pipeline events. */
interface Listener {
/**
* Called when an input buffer is {@linkplain #queueInputBuffer() queued}.
*
* @param positionUs The position of the buffer queued from the stream start position, in
* microseconds.
*/
void onInputBufferQueued(long positionUs);
/**
* Called if an exception occurs in the sample pipeline.
*
* @param exception The {@link TransformationException} describing the exception.
*/
void onTransformationError(TransformationException exception);
}
/** Returns a buffer if the pipeline is ready to accept input, and {@code null} otherwise. */
@Nullable
DecoderInputBuffer dequeueInputBuffer() throws TransformationException;
......@@ -49,10 +68,4 @@ import com.google.android.exoplayer2.decoder.DecoderInputBuffer;
/** Releases all resources held by the pipeline. */
void release();
/**
* Returns the current timestamp being processed in the track, in milliseconds. This is the
* largest timestamp queued minus the stream start time, or 0 if no input has been queued.
*/
long getCurrentPositionMs();
}
......@@ -50,8 +50,6 @@ import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.List;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
/* package */ final class TransformerInternal {
......@@ -98,10 +96,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private final Handler handler;
private final ExoPlayerAssetLoader exoPlayerAssetLoader;
private final MuxerWrapper muxerWrapper;
private final List<SamplePipeline> samplePipelines;
private final ConditionVariable releasingMuxerConditionVariable;
private @Transformer.ProgressState int progressState;
private long progressPositionMs;
private long durationMs;
private boolean released;
private volatile @MonotonicNonNull TransformationResult transformationResult;
......@@ -137,13 +135,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
this.debugViewProvider = debugViewProvider;
this.clock = clock;
handler = Util.createHandlerForCurrentLooper();
AssetLoaderListener assetLoaderListener = new AssetLoaderListener(mediaItem, fallbackListener);
ComponentListener componentListener = new ComponentListener(mediaItem, fallbackListener);
muxerWrapper =
new MuxerWrapper(
outputPath,
outputParcelFileDescriptor,
muxerFactory,
/* errorConsumer= */ assetLoaderListener::onError);
/* errorConsumer= */ componentListener::onTransformationError);
exoPlayerAssetLoader =
new ExoPlayerAssetLoader(
context,
......@@ -151,9 +149,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
removeAudio,
removeVideo,
mediaSourceFactory,
assetLoaderListener,
componentListener,
clock);
samplePipelines = new ArrayList<>(/* initialCapacity= */ 2);
releasingMuxerConditionVariable = new ConditionVariable();
progressState = PROGRESS_STATE_WAITING_FOR_AVAILABILITY;
}
......@@ -164,8 +161,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
public @Transformer.ProgressState int getProgress(ProgressHolder progressHolder) {
if (progressState == PROGRESS_STATE_AVAILABLE) {
long positionMs = getCurrentPositionMs();
progressHolder.progress = min((int) (positionMs * 100 / durationMs), 99);
progressHolder.progress = min((int) (progressPositionMs * 100 / durationMs), 99);
}
return progressState;
}
......@@ -183,7 +179,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
if (released) {
return;
}
samplePipelines.clear();
progressState = PROGRESS_STATE_NO_TRANSFORMATION;
released = true;
HandlerWrapper playbackHandler =
......@@ -220,29 +215,25 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
}
}
private long getCurrentPositionMs() {
if (samplePipelines.isEmpty()) {
return 0;
}
long positionMsSum = 0;
for (int i = 0; i < samplePipelines.size(); i++) {
positionMsSum += samplePipelines.get(i).getCurrentPositionMs();
}
return positionMsSum / samplePipelines.size();
}
private class ComponentListener
implements ExoPlayerAssetLoader.Listener, SamplePipeline.Listener {
private class AssetLoaderListener implements ExoPlayerAssetLoader.Listener {
private static final long MIN_DURATION_BETWEEN_PROGRESS_UPDATES_MS = 100;
private final MediaItem mediaItem;
private final FallbackListener fallbackListener;
private long lastProgressUpdateMs;
private long lastProgressPositionMs;
private volatile boolean trackRegistered;
public AssetLoaderListener(MediaItem mediaItem, FallbackListener fallbackListener) {
public ComponentListener(MediaItem mediaItem, FallbackListener fallbackListener) {
this.mediaItem = mediaItem;
this.fallbackListener = fallbackListener;
}
// ExoPlayerAssetLoader.Listener implementation.
@Override
public void onDurationMs(long durationMs) {
// Make progress permanently unavailable if the duration is unknown, so that it doesn't jump
......@@ -273,10 +264,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
public SamplePipeline onTrackAdded(
Format format, long streamStartPositionUs, long streamOffsetUs)
throws TransformationException {
SamplePipeline samplePipeline =
getSamplePipeline(format, streamStartPositionUs, streamOffsetUs);
samplePipelines.add(samplePipeline);
return samplePipeline;
return getSamplePipeline(format, streamStartPositionUs, streamOffsetUs);
}
@Override
......@@ -298,6 +286,26 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
handleTransformationEnded(/* transformationException= */ null);
}
// SamplePipeline.Listener implementation.
@Override
public void onInputBufferQueued(long positionUs) {
long positionMs = Util.usToMs(positionUs);
long elapsedTimeMs = clock.elapsedRealtime();
if (elapsedTimeMs > lastProgressUpdateMs + MIN_DURATION_BETWEEN_PROGRESS_UPDATES_MS
&& positionMs > lastProgressPositionMs) {
lastProgressUpdateMs = elapsedTimeMs;
// Store positionMs in a local variable to make sure the thread reads the latest value.
lastProgressPositionMs = positionMs;
handler.post(() -> progressPositionMs = positionMs);
}
}
@Override
public void onTransformationError(TransformationException transformationException) {
handleTransformationEnded(transformationException);
}
private SamplePipeline getSamplePipeline(
Format inputFormat, long streamStartPositionUs, long streamOffsetUs)
throws TransformationException {
......@@ -311,6 +319,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
decoderFactory,
encoderFactory,
muxerWrapper,
/* listener= */ this,
fallbackListener);
} else if (MimeTypes.isVideo(inputFormat.sampleMimeType)
&& shouldTranscodeVideo(inputFormat, streamStartPositionUs, streamOffsetUs)) {
......@@ -325,8 +334,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
decoderFactory,
encoderFactory,
muxerWrapper,
/* listener= */ this,
fallbackListener,
this::onError,
debugViewProvider);
} else {
return new PassthroughSamplePipeline(
......@@ -335,6 +344,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
streamOffsetUs,
transformationRequest,
muxerWrapper,
/* listener= */ this,
fallbackListener);
}
}
......
......@@ -30,7 +30,6 @@ import com.google.android.exoplayer2.Format;
import com.google.android.exoplayer2.decoder.DecoderInputBuffer;
import com.google.android.exoplayer2.effect.Presentation;
import com.google.android.exoplayer2.effect.ScaleToFitTransformation;
import com.google.android.exoplayer2.util.Consumer;
import com.google.android.exoplayer2.util.DebugViewProvider;
import com.google.android.exoplayer2.util.Effect;
import com.google.android.exoplayer2.util.FrameInfo;
......@@ -74,8 +73,8 @@ import org.checkerframework.dataflow.qual.Pure;
Codec.DecoderFactory decoderFactory,
Codec.EncoderFactory encoderFactory,
MuxerWrapper muxerWrapper,
Listener listener,
FallbackListener fallbackListener,
Consumer<TransformationException> errorConsumer,
DebugViewProvider debugViewProvider)
throws TransformationException {
super(
......@@ -83,7 +82,8 @@ import org.checkerframework.dataflow.qual.Pure;
streamStartPositionUs,
streamOffsetUs,
transformationRequest.flattenForSlowMotion,
muxerWrapper);
muxerWrapper,
listener);
if (ColorInfo.isTransferHdr(inputFormat.colorInfo)) {
if (transformationRequest.hdrMode
......@@ -155,7 +155,7 @@ import org.checkerframework.dataflow.qual.Pure;
checkNotNull(frameProcessor)
.setOutputSurfaceInfo(encoderWrapper.getSurfaceInfo(width, height));
} catch (TransformationException exception) {
errorConsumer.accept(exception);
listener.onTransformationError(exception);
}
}
......@@ -166,7 +166,7 @@ import org.checkerframework.dataflow.qual.Pure;
@Override
public void onFrameProcessingError(FrameProcessingException exception) {
errorConsumer.accept(
listener.onTransformationError(
TransformationException.createForFrameProcessingException(
exception, TransformationException.ERROR_CODE_FRAME_PROCESSING_FAILED));
}
......@@ -176,7 +176,7 @@ import org.checkerframework.dataflow.qual.Pure;
try {
encoderWrapper.signalEndOfInputStream();
} catch (TransformationException exception) {
errorConsumer.accept(exception);
listener.onTransformationError(exception);
}
}
},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment