Skip to main content

Stream / Iterator

Introduction

What is Stream? In short: call once, return multiple times; like Iterators.

Flutter's Stream is a powerful abstraction. When using it as the return value of Rust function, we can allow the scenario that we call function once, and then return multiple times.

For example, your Rust function may run computationally heavy algorithms, and for every hundreds of milliseconds, it finds out a new piece of the full solution. In this case, it can immediately give that piece to Flutter, then Flutter can render it to UI immediately. Therefore, users do not need to wait for the full algorithm to finish before he can see some partial results on the user interface.

How to use Streams?

Create a Rust function that asks for a frb_generated::StreamSink as a parameter, like fn f(sink: StreamSink<T>, ..) -> Result<()>. This is translated to a Dart function Stream<T> f(..). When f() is called on the Dart side it returns a Dart Stream<T> that is connected to the sink on the Rust side.

Notice that, you can hold that StreamSink forever, and use it freely even after the Rust function itself returns. The logger example below also demonstrates this (the create_log_stream returns almost immediately, while you can use the StreamSink after, say, an hour).

The StreamSink can be placed at any location. For example, fn f(a: i32, b: StreamSink<String>) and fn f(a: StreamSink<String>, b: i32) are both valid.

Stream argument in arbitrary types

The StreamSink type can also be placed in arbitrary types, such as inside structs, enums, vectors, ... See example 3 below for more details.

Control whether to wait

By default, the stream is immediately usable in Dart, and the Rust function is not awaited.

If you need the Rust function to finish execution before the stream can be obtained in Dart, just add #[frb(stream_dart_await)] (to await the stream) or #[frb(sync)] (to let the whole function be synchronous) to the function.

Add an error

To put an error into the stream, the stream.add_error method can be utilized. It currently accepts an anyhow::Error type.

For example, we can write down stream.add_error(anyhow::anyhow!("hello")) and the Dart side will see an exception thrown.

Examples

See logging examples which uses streams extensively.

Simple

Simply iterate through your Dart stream, and call a normal Rust function for each item. For example:

myStream.listen((data) => myRustfunction(data));

While on the Rust side:

fn my_rust_function(data: WhateverType) { ... }

A timer

Credits: this and #347.

Details
use anyhow::Result;
use std::{thread::sleep, time::Duration};

use crate::frb_generated::StreamSink;

const ONE_SECOND: Duration = Duration::from_secs(1);

// can't omit the return type yet, this is a bug
pub fn tick(sink: StreamSink<i32>) -> Result<()> {
let mut ticks = 0;
loop {
sink.add(ticks);
sleep(ONE_SECOND);
if ticks == i32::MAX {
break;
}
ticks += 1;
}
Ok(())
}

And use it in Dart:

Details
import 'package:flutter/material.dart';
import 'ffi.dart';

void main() {
runApp(const MyApp());
}

class MyApp extends StatelessWidget {
const MyApp({Key? key}) : super(key: key);


Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: const MyHomePage(title: 'Flutter Demo Home Page'),
);
}
}

class MyHomePage extends StatefulWidget {
const MyHomePage({Key? key, required this.title}) : super(key: key);
final String title;


State<MyHomePage> createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
late Stream<int> ticks;


void initState() {
super.initState();
ticks = api.tick();
}


Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text(widget.title),
),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: <Widget>[
const Text("Time since starting Rust stream"),
StreamBuilder<int>(
stream: ticks,
builder: (context, snap) {
final style = Theme.of(context).textTheme.headlineMedium;
final error = snap.error;
if (error != null)
return Tooltip(
message: error.toString(),
child: Text('Error', style: style));

final data = snap.data;
if (data != null) return Text('$data second(s)', style: style);

return const CircularProgressIndicator();
},
)
],
),
),
);
}
}

Stream type inside arbitrary types

For example, we can place it as a field of a struct or inside a vector:

pub struct MyStruct {
a: String,
b: StreamSink<i32>,
}

pub fn f(arr: Vec<StreamSink<i32>>, st: MyStruct) {}