processConsumerTransportsAudio function
Adjusts the audio state of consumer transports based on provided streams.
This function examines each audio consumer transport:
- If a transport's
matches an entry inlStreams
and is currently paused, it resumes the transport. - If a transport's
does not match any entry inlStreams
and is unpaused, it pauses the transport. - The function incorporates a delay before pausing to allow for smoother transitions.
: List of audio transports that may need to be paused or resumed.lStreams
: List of streams that represent valid audio sources for the transports.parameters
: Contains:sleep
: A function to add a delay before pausing a transport.getUpdatedAllParams
: A function that refreshes the parameters used for transport processing.
- Pausing and Resuming: Pauses transports not found in
and resumes those that are. - Delay Handling: A short delay is added before pausing transports to optimize timing for smooth state transitions.
- Socket Events: Emits
events to the server to synchronize transport states.
Example Usage:
final parameters = ProcessConsumerTransportsAudioParameters(
sleep: (options) async => await Future.delayed(Duration(milliseconds:,
getUpdatedAllParams: () => updatedParams, // Returns the latest parameters
await processConsumerTransportsAudio(
consumerTransports: [transport1, transport2],
lStreams: [stream1, stream2],
parameters: parameters,
Error Handling:
- Logs any errors encountered during the processing of transports in debug mode.
Future<void> processConsumerTransportsAudio(
ProcessConsumerTransportsAudioOptions options) async {
// Retrieve and destructure updated parameters
final ProcessConsumerTransportsAudioParameters parameters =
final SleepType sleep = parameters.sleep;
final List<TransportType> consumerTransports = options.consumerTransports;
final List<Stream> lStreams = options.lStreams;
try {
// Helper function to check if producerId exists in any provided stream array
bool isValidProducerId(String producerId, List<List<Stream>> streamArrays) {
return producerId.isNotEmpty &&
streamArrays.any((streamArray) =>
streamArray.isNotEmpty &&
streamArray.any((stream) => stream.producerId == producerId));
// Get paused consumer transports that are audio
final consumerTransportsToResume = consumerTransports.where((transport) =>
isValidProducerId(transport.producerId, [lStreams]) &&
transport.consumer.paused == true &&
transport.consumer.track.kind == 'audio');
// Get unpaused consumer transports that are audio and not in lStreams
final consumerTransportsToPause = consumerTransports.where((transport) =>
transport.producerId.isNotEmpty &&
transport.producerId != "" &&
!lStreams.any((stream) => stream.producerId == transport.producerId) &&
transport.consumer.track.kind == 'audio' &&
transport.consumer.paused == false);
// Pause consumer transports after a short delay
final sleepOptions = SleepOptions(ms: 100);
await sleep(sleepOptions);
// Note 'serverConsumerId' is '' not 'serverconsumerTransportId'
for (final transport in consumerTransportsToPause) {
transport.socket_.emitWithAck("consumer-pause", {
}, ack: (paused) {
// Handle pause acknowledgment
// Note 'serverConsumerId' is '' not 'serverconsumerTransportId'
for (final transport in consumerTransportsToResume) {
"consumer-resume", {'serverConsumerId':},
ack: (resumed) async {
if (resumed['resumed'] == true) {
} catch (error) {
// Handle errors during the process of pausing or resuming consumer transports
if (kDebugMode) {
print('Error processing consumer transports: $error');
// throw new Error('Error processing consumer transports: $error');