timeout method Null safety

Stream<T> timeout (
  1. Duration timeLimit,
  2. {void onTimeout(
    1. EventSink<T> sink
    )}
)

Creates a new stream with the same events as this stream.

Whenever more than timeLimit passes between two events from this stream, the onTimeout function is called, which can emit further events on the returned stream.

The countdown doesn't start until the returned stream is listened to. The countdown is reset every time an event is forwarded from this stream, or when this stream is paused and resumed.

The onTimeout function is called with one argument: an EventSink that allows putting events into the returned stream. This EventSink is only valid during the call to onTimeout. Calling EventSink.close on the sink passed to onTimeout closes the returned stream, and no further events are processed.

If onTimeout is omitted, a timeout will just put a TimeoutException into the error channel of the returned stream. If the call to onTimeout throws, the error is emitted on the returned stream.

The returned stream is a broadcast stream if this stream is. If a broadcast stream is listened to more than once, each subscription will have its individually timer that starts counting on listen, and the subscriptions' timers can be paused individually.

Implementation

Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)?}) {
  _StreamControllerBase<T> controller;
  if (isBroadcast) {
    controller = new _SyncBroadcastStreamController<T>(null, null);
  } else {
    controller = new _SyncStreamController<T>(null, null, null, null);
  }

  Zone zone = Zone.current;
  // Register callback immediately.
  _TimerCallback timeoutCallback;
  if (onTimeout == null) {
    timeoutCallback = () {
      controller.addError(
          new TimeoutException("No stream event", timeLimit), null);
    };
  } else {
    // TODO(floitsch): the return type should be 'void', and the type
    // should be inferred.
    var registeredOnTimeout =
        zone.registerUnaryCallback<void, EventSink<T>>(onTimeout);
    var wrapper = new _ControllerEventSinkWrapper<T>(null);
    timeoutCallback = () {
      wrapper._sink = controller; // Only valid during call.
      zone.runUnaryGuarded(registeredOnTimeout, wrapper);
      wrapper._sink = null;
    };
  }

  // All further setup happens inside `onListen`.
  controller.onListen = () {
    Timer timer = zone.createTimer(timeLimit, timeoutCallback);
    var subscription = this.listen(null);
    // Set up event forwarding. Each data or error event resets the timer
    subscription
      ..onData((T event) {
        timer.cancel();
        timer = zone.createTimer(timeLimit, timeoutCallback);
        // Controller is synchronous, and the call might close the stream
        // and cancel the timer,
        // so create the Timer before calling into add();
        // issue: https://github.com/dart-lang/sdk/issues/37565
        controller.add(event);
      })
      ..onError((Object error, StackTrace stackTrace) {
        timer.cancel();
        timer = zone.createTimer(timeLimit, timeoutCallback);
        controller._addError(
            error, stackTrace); // Avoid Zone error replacement.
      })
      ..onDone(() {
        timer.cancel();
        controller.close();
      });
    // Set up further controller callbacks.
    controller.onCancel = () {
      timer.cancel();
      return subscription.cancel();
    };
    if (!isBroadcast) {
      controller
        ..onPause = () {
          timer.cancel();
          subscription.pause();
        }
        ..onResume = () {
          subscription.resume();
          timer = zone.createTimer(timeLimit, timeoutCallback);
        };
    }
  };

  return controller.stream;
}