processConsumerTransports function
- ProcessConsumerTransportsOptions options
Processes consumer transports to pause or resume video streams based on provided stream lists.
This function iterates over consumer transports, checking if each transport's producerId
matches any in the provided lists of streams (lStreams_
, remoteScreenStream
, oldAllStreams
,
newLimitedStreams
). If a transport is paused and its producerId matches a stream in the lists,
it resumes the transport. If a transport is unpaused and its producerId does not match any stream
in the lists, it pauses the transport after a brief delay.
Parameters:
options
(ProcessConsumerTransportsOptions
): Contains:consumerTransports
: List of transports to process.lStreams_
: List of current streams to compare producerIds against.parameters
: Includes thesleep
function for a delay and lists of old and new streams.
Example:
final parameters = ProcessConsumerTransportsParameters(
remoteScreenStream: [screenStream1],
oldAllStreams: [oldStream1, oldStream2],
newLimitedStreams: [limitedStream1],
sleep: (options) async => await Future.delayed(Duration(milliseconds: options.ms)),
getUpdatedAllParams: () => updatedParams, // Function to retrieve updated parameters if needed
);
await processConsumerTransports(
ProcessConsumerTransportsOptions(
consumerTransports: [transport1, transport2],
lStreams_: [stream1, stream2],
parameters: parameters,
),
);
Implementation
Future<void> processConsumerTransports(
ProcessConsumerTransportsOptions options) async {
// Retrieve and destructure updated parameters
final ProcessConsumerTransportsParameters parameters =
options.parameters.getUpdatedAllParams();
final SleepType sleep = parameters.sleep;
final List<TransportType> consumerTransports = options.consumerTransports;
final List<Stream> lStreams_ = options.lStreams_;
final List<Stream> remoteScreenStream = parameters.remoteScreenStream;
final List<Stream> oldAllStreams = parameters.oldAllStreams;
final List<Stream> newLimitedStreams = parameters.newLimitedStreams;
try {
// Helper function to check if producerId exists in any provided stream array
bool isValidProducerId(String producerId, List<List<Stream>> streamArrays) {
return producerId.isNotEmpty &&
streamArrays.any((streamArray) =>
streamArray.any((stream) => stream.producerId == producerId));
}
// Get paused consumer transports that are not audio
final consumerTransportsToResume = consumerTransports.where((transport) =>
isValidProducerId(transport.producerId, [
lStreams_,
remoteScreenStream,
oldAllStreams,
newLimitedStreams
]) &&
transport.consumer.paused &&
transport.consumer.track.kind != 'audio');
// Get unpaused consumer transports that are not audio and not in lStreams
final consumerTransportsToPause = consumerTransports.where((transport) =>
transport.producerId.isNotEmpty &&
!lStreams_.any((stream) => stream.producerId == transport.producerId) &&
transport.consumer.track.kind != 'audio' &&
!remoteScreenStream
.any((stream) => stream.producerId == transport.producerId) &&
!oldAllStreams
.any((stream) => stream.producerId == transport.producerId) &&
!newLimitedStreams
.any((stream) => stream.producerId == transport.producerId) &&
!transport.consumer.paused);
// Pause consumer transports after a short delay
final sleepOptions = SleepOptions(ms: 100);
await sleep(sleepOptions);
// Emit consumer.pause() for each filtered transport (not audio)
// Note 'serverConsumerId' is 'transport.consumer.id' not 'serverconsumerTransportId'
for (final transport in consumerTransportsToPause) {
transport.consumer.pause();
transport.socket_.emitWithAck(
"consumer-pause", {'serverConsumerId': transport.consumer.id},
ack: (_) {});
}
// Emit consumer.resume() for each filtered transport (not audio)
// Note 'serverConsumerId' is 'transport.consumer.id' not 'serverconsumerTransportId'
for (final transport in consumerTransportsToResume) {
transport.socket_.emitWithAck("consumer-resume", {
'serverConsumerId': transport.consumer.id,
}, ack: (resumed) async {
if (resumed['resumed'] == true) {
transport.consumer.resume();
}
});
}
} catch (error) {
if (kDebugMode) {
print('Error in processConsumerTransports: $error');
}
}
}