connectIps function

Future<List> connectIps(
  1. ConnectIpsOptions options
)

Connects to multiple remote IPs to manage socket connections for media consumption.

This function iterates over a list of remote IPs, attempting to establish socket connections and manage events for new media producers and closed producers in the connected rooms. If successful, it updates the consumeSockets list with each connected socket and tracks connected IPs in roomRecvIPs.

Parameters:

  • options (ConnectIpsOptions): Configuration options for establishing connections and managing sockets:
    • consumeSockets (List<Map<String, io.Socket>>): A list of socket connections for each IP.
    • remIP (List<String>): A list of remote IPs to connect to.
    • apiUserName (String): API username for authentication.
    • apiKey (String?): Optional API key for authentication.
    • apiToken (String): API token for authentication.
    • newProducerMethod (NewPipeProducerType?): Optional function to handle new producer events.
    • closedProducerMethod (ProducerClosedType?): Optional function to handle closed producer events.
    • joinConsumeRoomMethod (JoinConsumeRoomType?): Optional function to handle joining a room.
    • parameters (ConnectIpsParameters): Parameters object to handle state updates and manage dependencies.

Returns:

  • A Future<List<dynamic>> containing:
    • Updated list of consumeSockets with newly connected sockets.
    • Updated list of roomRecvIPs with connected IP addresses.

Example Usage:

final options = ConnectIpsOptions(
  consumeSockets: [],
  remIP: ['100.122.1.1', '100.122.1.2'],
  apiUserName: 'myUserName',
  apiToken: 'myToken',
  parameters: myConnectIpsParametersInstance,
);

connectIps(options).then(([consumeSockets, roomRecvIPs]) {
  print('Successfully connected to IPs: $roomRecvIPs');
  print('Active consume sockets: $consumeSockets');
});

Error Handling:

Logs errors in debug mode if connection or socket events fail, without throwing exceptions.

Implementation

Future<List<dynamic>> connectIps(ConnectIpsOptions options) async {
  var parameters = options.parameters.getUpdatedAllParams();

  // Extract parameters
  List<Map<String, io.Socket>> consumeSockets = options.consumeSockets;
  List<String> roomRecvIPs = parameters.roomRecvIPs;
  final updateRoomRecvIPs = parameters.updateRoomRecvIPs;
  final updateConsumeSockets = parameters.updateConsumeSockets;

  final newProducerMethod = options.newProducerMethod ?? newPipeProducer;
  final closedProducerMethod = options.closedProducerMethod ?? producerClosed;
  final joinConsumeRoomMethod =
      options.joinConsumeRoomMethod ?? joinConsumeRoom;

  try {
    // Check for required parameters
    if (options.apiKey == null && options.apiToken.isEmpty) {
      if (kDebugMode) {
        print('Missing required parameters for authentication');
      }
      return [consumeSockets, roomRecvIPs];
    }

    for (final ip in options.remIP) {
      try {
        // Check if the IP is already connected
        final existingSocket = consumeSockets.firstWhere(
          (socketObj) => socketObj.keys.first == ip,
          orElse: () => {},
        );

        if (existingSocket.isNotEmpty ||
            ip.isEmpty ||
            ip == '' ||
            ip == 'none') {
          continue;
        }

        // Connect to the remote socket using SocketManager
        final optionsConnect = ConnectSocketOptions(
          apiUserName: options.apiUserName,
          apiKey: options.apiKey ?? '',
          apiToken: options.apiToken,
          link: 'https://$ip.mediasfu.com',
        );
        io.Socket remoteSock = await connectSocket(
          optionsConnect,
        );

        if (remoteSock.id != null && remoteSock.id!.isNotEmpty) {
          if (!roomRecvIPs.contains(ip)) {
            roomRecvIPs.add(ip);
            updateRoomRecvIPs(roomRecvIPs);
          }

          // Event handler for 'new-pipe-producer'
          remoteSock.on('new-pipe-producer', (data) async {
            final optionsNewPipeProducer = NewPipeProducerOptions(
              producerId: data['producerId'],
              islevel: data['islevel'],
              nsock: remoteSock,
              parameters: parameters,
            );
            await newProducerMethod(
              optionsNewPipeProducer,
            );
          });

          // Event handler for 'producer-closed'
          remoteSock.on('producer-closed', (data) async {
            final optionsProducerClosed = ProducerClosedOptions(
              remoteProducerId: data['remoteProducerId'],
              parameters: parameters,
            );
            await closedProducerMethod(
              optionsProducerClosed,
            );
          });

          // Join the consumption room if required
          final optionsJoinConsume = JoinConsumeRoomOptions(
            remoteSock: remoteSock,
            apiToken: options.apiToken,
            apiUserName: options.apiUserName,
            parameters: parameters,
          );

          final dataJSON = await joinConsumeRoomMethod(
            optionsJoinConsume,
          );

          if (dataJSON is Map<String, dynamic>) {
            final data = ResponseJoinRoom.fromJson(dataJSON);

            if (data.rtpCapabilities == null) {
              return [consumeSockets, roomRecvIPs];
            }

            // Add the remote socket to the consumeSockets array
            consumeSockets.add({ip: remoteSock});
            updateConsumeSockets(consumeSockets);
          }
        }
      } catch (error) {
        if (kDebugMode) {
          print('connectIps error with IP $ip: $error');
        }
      }
    }

    return [consumeSockets, roomRecvIPs];
  } catch (error) {
    if (kDebugMode) {
      print('connectIps error: $error');
    }
    return [consumeSockets, parameters.roomRecvIPs];
  }
}