Coroutines & Flow

Introduction
Kotlin Coroutines and Flow represent the modern approach to asynchronous programming in Android. Coroutines provide a way to write asynchronous, non-blocking code in a sequential manner, while Flow offers a reactive streams API for handling asynchronous data streams. Together, they form the foundation of modern Android development, enabling responsive, efficient, and maintainable applications.
This comprehensive guide will take you from basic coroutine concepts to advanced Flow operators and real-world applications. Whether you're handling network requests, database operations, or complex UI interactions, understanding coroutines and Flow will significantly improve your Android development skills and application performance.
Why Use Coroutines and Flow?
Before diving into implementation, let's understand the benefits of using coroutines and Flow:
- Simplified Asynchronous Code: Write async code that looks like synchronous code
- Lightweight: Coroutines are much lighter than threads
- Structured Concurrency: Automatic cancellation and resource management
- Reactive Programming: Flow provides reactive streams with backpressure support
- Exception Handling: Built-in exception handling mechanisms
- Integration: Seamless integration with Android lifecycle components
- Performance: Efficient resource utilization and memory management
- Testability: Easy to test asynchronous code
Setting Up Coroutines
Let's start by setting up coroutines in your Android project.
Dependencies
// app/build.gradle.kts
dependencies {
// Coroutines
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.7.3")
// Lifecycle integration
implementation("androidx.lifecycle:lifecycle-runtime-ktx:2.7.0")
implementation("androidx.lifecycle:lifecycle-viewmodel-ktx:2.7.0")
implementation("androidx.lifecycle:lifecycle-livedata-ktx:2.7.0")
// Room integration
implementation("androidx.room:room-ktx:2.6.1")
// Retrofit integration
implementation("com.squareup.retrofit2:retrofit:2.9.0")
implementation("com.jakewharton.retrofit:retrofit2-kotlin-coroutines-adapter:0.9.2")
}
Coroutines Basics
Understanding the fundamental concepts of coroutines.
Coroutine Builders
class CoroutineExample {
// launch - fire and forget
fun launchExample() {
lifecycleScope.launch {
val result = fetchDataFromNetwork()
updateUI(result)
}
}
// async - returns a result
fun asyncExample() {
lifecycleScope.launch {
val deferred1 = async { fetchUserData() }
val deferred2 = async { fetchUserPosts() }
val user = deferred1.await()
val posts = deferred2.await()
displayUserWithPosts(user, posts)
}
}
// runBlocking - blocks the current thread
fun runBlockingExample() {
runBlocking {
val result = fetchDataFromNetwork()
println("Result: $result")
}
}
// coroutineScope - creates a new coroutine scope
fun coroutineScopeExample() {
lifecycleScope.launch {
val result = coroutineScope {
val user = async { fetchUserData() }
val posts = async { fetchUserPosts() }
UserWithPosts(user.await(), posts.await())
}
displayResult(result)
}
}
// suspend functions
suspend fun fetchDataFromNetwork(): String {
delay(1000) // Simulate network delay
return "Data from network"
}
suspend fun fetchUserData(): User {
delay(500)
return User(1, "John Doe", "john@example.com")
}
suspend fun fetchUserPosts(): List {
delay(500)
return listOf(Post(1, "First Post"), Post(2, "Second Post"))
}
}
Coroutine Context and Dispatchers
class DispatcherExample {
fun dispatcherExamples() {
lifecycleScope.launch(Dispatchers.Main) {
// UI operations
updateUI()
}
lifecycleScope.launch(Dispatchers.IO) {
// Network, database, file operations
val data = fetchDataFromNetwork()
saveToDatabase(data)
}
lifecycleScope.launch(Dispatchers.Default) {
// CPU-intensive operations
val result = performComplexCalculation()
withContext(Dispatchers.Main) {
displayResult(result)
}
}
lifecycleScope.launch(Dispatchers.Unconfined) {
// Not recommended for production
println("Unconfined dispatcher")
}
}
fun withContextExample() {
lifecycleScope.launch(Dispatchers.Main) {
// Start on Main
showLoading()
val data = withContext(Dispatchers.IO) {
// Switch to IO for network call
fetchDataFromNetwork()
}
// Back to Main
hideLoading()
displayData(data)
}
}
suspend fun fetchDataFromNetwork(): String {
return withContext(Dispatchers.IO) {
delay(1000)
"Network data"
}
}
}
Exception Handling
Proper exception handling is crucial for robust coroutine-based applications.
Try-Catch in Coroutines
class ExceptionHandlingExample {
fun basicExceptionHandling() {
lifecycleScope.launch {
try {
val result = fetchDataFromNetwork()
displayResult(result)
} catch (e: IOException) {
showNetworkError("Network error: ${e.message}")
} catch (e: Exception) {
showGenericError("An error occurred: ${e.message}")
}
}
}
fun exceptionHandlingWithSupervisor() {
supervisorScope {
val deferred1 = async { fetchUserData() }
val deferred2 = async { fetchUserPosts() }
try {
val user = deferred1.await()
val posts = deferred2.await()
displayUserWithPosts(user, posts)
} catch (e: Exception) {
showError("Failed to load data: ${e.message}")
}
}
}
fun coroutineExceptionHandler() {
val exceptionHandler = CoroutineExceptionHandler { _, exception ->
Log.e("CoroutineExample", "Coroutine exception: ${exception.message}")
showError("An error occurred: ${exception.message}")
}
lifecycleScope.launch(exceptionHandler) {
val result = fetchDataFromNetwork()
displayResult(result)
}
}
suspend fun fetchDataFromNetwork(): String {
delay(1000)
if (Random.nextBoolean()) {
throw IOException("Network error")
}
return "Success data"
}
}
Cancellation and Timeout
class CancellationExample {
private var job: Job? = null
fun startDataFetch() {
job = lifecycleScope.launch {
try {
val result = withTimeout(5000) { // 5 second timeout
fetchDataFromNetwork()
}
displayResult(result)
} catch (e: TimeoutCancellationException) {
showTimeoutError("Request timed out")
} catch (e: CancellationException) {
Log.d("CancellationExample", "Coroutine was cancelled")
}
}
}
fun cancelDataFetch() {
job?.cancel()
}
fun timeoutExample() {
lifecycleScope.launch {
try {
val result = withTimeoutOrNull(3000) {
fetchDataFromNetwork()
}
if (result != null) {
displayResult(result)
} else {
showTimeoutError("Request timed out")
}
} catch (e: Exception) {
showError("Error: ${e.message}")
}
}
}
suspend fun fetchDataFromNetwork(): String {
delay(2000) // Simulate network delay
return "Network data"
}
}
Flow Basics
Flow is Kotlin's reactive streams API for handling asynchronous data streams.
Creating Flows
class FlowExample {
// Simple flow
fun simpleFlow(): Flow = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
// Flow from collection
fun flowFromCollection(): Flow = listOf("Apple", "Banana", "Orange").asFlow()
// Flow from range
fun flowFromRange(): Flow = (1..10).asFlow()
// Flow with transformation
fun transformedFlow(): Flow = flow {
for (i in 1..3) {
delay(100)
emit("Number $i")
}
}.map { it.uppercase() }
// StateFlow - hot flow that holds current state
private val _uiState = MutableStateFlow(UiState.Loading)
val uiState: StateFlow = _uiState.asStateFlow()
// SharedFlow - hot flow for one-time events
private val _events = MutableSharedFlow()
val events: SharedFlow = _events.asSharedFlow()
fun collectFlow() {
lifecycleScope.launch {
simpleFlow().collect { value ->
println("Received: $value")
}
}
}
fun collectWithLifecycle() {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
simpleFlow().collect { value ->
updateUI(value)
}
}
}
}
}
sealed class UiState {
object Loading : UiState()
data class Success(val data: String) : UiState()
data class Error(val message: String) : UiState()
}
sealed class Event {
data class ShowToast(val message: String) : Event()
data class NavigateTo(val destination: String) : Event()
}
Flow Operators
class FlowOperatorsExample {
fun operatorExamples() {
lifecycleScope.launch {
// Map operator
simpleFlow()
.map { it * 2 }
.collect { println("Doubled: $it") }
// Filter operator
simpleFlow()
.filter { it % 2 == 0 }
.collect { println("Even: $it") }
// Transform operator
simpleFlow()
.transform { value ->
emit("First: $value")
emit("Second: $value")
}
.collect { println(it) }
// Take operator
simpleFlow()
.take(3)
.collect { println("First 3: $it") }
// Drop operator
simpleFlow()
.drop(2)
.collect { println("After dropping 2: $it") }
// DistinctUntilChanged
flowOf(1, 1, 2, 2, 3, 3)
.distinctUntilChanged()
.collect { println("Distinct: $it") }
}
}
fun combiningFlows() {
lifecycleScope.launch {
val flow1 = flowOf(1, 2, 3)
val flow2 = flowOf("A", "B", "C")
// Combine flows
flow1.combine(flow2) { number, letter ->
"$number$letter"
}.collect { println("Combined: $it") }
// Zip flows
flow1.zip(flow2) { number, letter ->
"$number$letter"
}.collect { println("Zipped: $it") }
// Merge flows
merge(flow1, flow2.map { it.toString() })
.collect { println("Merged: $it") }
}
}
fun flowOf(): Flow = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
}
StateFlow and SharedFlow
StateFlow and SharedFlow are specialized Flow types for specific use cases.
StateFlow
class StateFlowExample {
private val _userState = MutableStateFlow(UserState.Loading)
val userState: StateFlow = _userState.asStateFlow()
private val _counter = MutableStateFlow(0)
val counter: StateFlow = _counter.asStateFlow()
fun loadUser(userId: Int) {
lifecycleScope.launch {
_userState.value = UserState.Loading
try {
val user = userRepository.getUserById(userId)
_userState.value = UserState.Success(user)
} catch (e: Exception) {
_userState.value = UserState.Error(e.message ?: "Unknown error")
}
}
}
fun incrementCounter() {
_counter.value = _counter.value + 1
}
fun decrementCounter() {
_counter.value = _counter.value - 1
}
fun observeUserState() {
lifecycleScope.launch {
userState.collect { state ->
when (state) {
is UserState.Loading -> showLoading()
is UserState.Success -> displayUser(state.user)
is UserState.Error -> showError(state.message)
}
}
}
}
}
sealed class UserState {
object Loading : UserState()
data class Success(val user: User) : UserState()
data class Error(val message: String) : UserState()
}
SharedFlow
class SharedFlowExample {
private val _events = MutableSharedFlow()
val events: SharedFlow = _events.asSharedFlow()
private val _oneTimeEvents = MutableSharedFlow()
val oneTimeEvents: SharedFlow = _oneTimeEvents.asSharedFlow()
fun emitEvent(event: Event) {
lifecycleScope.launch {
_events.emit(event)
}
}
fun emitOneTimeEvent(event: OneTimeEvent) {
lifecycleScope.launch {
_oneTimeEvents.emit(event)
}
}
fun observeEvents() {
lifecycleScope.launch {
events.collect { event ->
when (event) {
is Event.ShowToast -> showToast(event.message)
is Event.NavigateTo -> navigateTo(event.destination)
is Event.ShowDialog -> showDialog(event.title, event.message)
}
}
}
}
fun observeOneTimeEvents() {
lifecycleScope.launch {
oneTimeEvents.collect { event ->
when (event) {
is OneTimeEvent.UserLoggedIn -> handleUserLogin(event.user)
is OneTimeEvent.PaymentCompleted -> handlePaymentCompletion(event.amount)
is OneTimeEvent.DataSyncCompleted -> handleDataSync()
}
}
}
}
}
sealed class Event {
data class ShowToast(val message: String) : Event()
data class NavigateTo(val destination: String) : Event()
data class ShowDialog(val title: String, val message: String) : Event()
}
sealed class OneTimeEvent {
data class UserLoggedIn(val user: User) : OneTimeEvent()
data class PaymentCompleted(val amount: Double) : OneTimeEvent()
object DataSyncCompleted : OneTimeEvent()
}
Coroutines with Android Components
Integrating coroutines with Android lifecycle components.
ViewModel Integration
class UserViewModel : ViewModel() {
private val _users = MutableStateFlow>(emptyList())
val users: StateFlow> = _users.asStateFlow()
private val _loading = MutableStateFlow(false)
val loading: StateFlow = _loading.asStateFlow()
private val _error = MutableStateFlow(null)
val error: StateFlow = _error.asStateFlow()
init {
loadUsers()
}
fun loadUsers() {
viewModelScope.launch {
_loading.value = true
_error.value = null
try {
val userList = userRepository.getUsers()
_users.value = userList
} catch (e: Exception) {
_error.value = e.message ?: "Unknown error"
} finally {
_loading.value = false
}
}
}
fun refreshUsers() {
viewModelScope.launch {
try {
userRepository.refreshUsers()
loadUsers()
} catch (e: Exception) {
_error.value = e.message ?: "Refresh failed"
}
}
}
fun searchUsers(query: String) {
viewModelScope.launch {
if (query.isEmpty()) {
loadUsers()
} else {
val filteredUsers = userRepository.searchUsers(query)
_users.value = filteredUsers
}
}
}
override fun onCleared() {
super.onCleared()
// Cleanup if needed
}
}
Activity and Fragment Integration
class MainActivity : AppCompatActivity() {
private val viewModel: UserViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
observeViewModel()
}
private fun observeViewModel() {
lifecycleScope.launch {
// Observe users
viewModel.users.collect { users ->
updateUserList(users)
}
}
lifecycleScope.launch {
// Observe loading state
viewModel.loading.collect { isLoading ->
showLoading(isLoading)
}
}
lifecycleScope.launch {
// Observe errors
viewModel.error.collect { error ->
error?.let { showError(it) }
}
}
}
private fun performAction() {
lifecycleScope.launch {
try {
val result = withContext(Dispatchers.IO) {
performHeavyOperation()
}
handleResult(result)
} catch (e: Exception) {
showError("Operation failed: ${e.message}")
}
}
}
}
class UserFragment : Fragment() {
private val viewModel: UserViewModel by viewModels()
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
observeViewModel()
}
private fun observeViewModel() {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.users.collect { users ->
updateUserList(users)
}
}
}
}
}
Advanced Flow Features
Advanced Flow features for complex use cases.
Flow Operators
class AdvancedFlowExample {
fun advancedOperators() {
lifecycleScope.launch {
// Buffer operator
simpleFlow()
.buffer(10)
.collect { println("Buffered: $it") }
// Conflate operator
simpleFlow()
.conflate()
.collect { println("Conflated: $it") }
// FlowOn operator
simpleFlow()
.flowOn(Dispatchers.IO)
.collect { println("FlowOn: $it") }
// Catch operator
simpleFlow()
.catch { exception ->
emit("Error: ${exception.message}")
}
.collect { println("Caught: $it") }
// OnStart and onCompletion
simpleFlow()
.onStart { println("Flow started") }
.onCompletion { println("Flow completed") }
.collect { println("Value: $it") }
}
}
fun customFlow() {
lifecycleScope.launch {
customFlow()
.collect { println("Custom: $it") }
}
}
fun customFlow(): Flow = flow {
emit("First")
delay(100)
emit("Second")
delay(100)
emit("Third")
}.map { it.uppercase() }
.filter { it.length > 3 }
.onEach { println("Processing: $it") }
fun flowWithTimeout() {
lifecycleScope.launch {
simpleFlow()
.timeout(1000)
.catch { exception ->
if (exception is TimeoutCancellationException) {
println("Flow timed out")
}
}
.collect { println("With timeout: $it") }
}
}
fun flowOf(): Flow = flow {
for (i in 1..5) {
delay(200)
emit(i)
}
}
}
Channel Integration
class ChannelExample {
private val channel = Channel()
fun sendData() {
lifecycleScope.launch {
channel.send("Data 1")
channel.send("Data 2")
channel.send("Data 3")
channel.close()
}
}
fun receiveData() {
lifecycleScope.launch {
for (data in channel) {
println("Received: $data")
}
}
}
fun channelAsFlow() {
lifecycleScope.launch {
channel.receiveAsFlow()
.collect { data ->
println("Channel as flow: $data")
}
}
}
fun sharedFlowFromChannel() {
val sharedFlow = channel.receiveAsFlow()
.shareIn(
scope = lifecycleScope,
started = SharingStarted.WhileSubscribed(),
replay = 1
)
lifecycleScope.launch {
sharedFlow.collect { data ->
println("Shared flow from channel: $data")
}
}
}
}
Testing Coroutines and Flow
Testing coroutines and Flow is essential for reliable applications.
Testing Coroutines
@RunWith(AndroidJUnit4::class)
class CoroutineTest {
@get:Rule
val mainDispatcherRule = MainDispatcherRule()
@Test
fun testCoroutineExecution() = runTest {
val result = fetchDataFromNetwork()
assertEquals("Expected data", result)
}
@Test
fun testCoroutineCancellation() = runTest {
val job = launch {
delay(1000)
fail("Should not reach here")
}
job.cancel()
job.join()
assertTrue(job.isCancelled)
}
@Test
fun testExceptionHandling() = runTest {
val result = try {
fetchDataWithException()
"Success"
} catch (e: Exception) {
"Error: ${e.message}"
}
assertEquals("Error: Network error", result)
}
suspend fun fetchDataFromNetwork(): String {
delay(100)
return "Expected data"
}
suspend fun fetchDataWithException(): String {
delay(100)
throw IOException("Network error")
}
}
// Custom test rule for main dispatcher
class MainDispatcherRule : TestWatcher() {
private val testDispatcher = StandardTestDispatcher()
override fun starting(description: Description) {
super.starting(description)
Dispatchers.setMain(testDispatcher)
}
override fun finished(description: Description) {
super.finished(description)
Dispatchers.resetMain()
}
}
Testing Flow
@RunWith(AndroidJUnit4::class)
class FlowTest {
@Test
fun testFlowEmission() = runTest {
val flow = flowOf(1, 2, 3, 4, 5)
val result = mutableListOf()
flow.collect { result.add(it) }
assertEquals(listOf(1, 2, 3, 4, 5), result)
}
@Test
fun testFlowTransformation() = runTest {
val flow = flowOf(1, 2, 3, 4, 5)
.map { it * 2 }
.filter { it > 5 }
val result = mutableListOf()
flow.collect { result.add(it) }
assertEquals(listOf(6, 8, 10), result)
}
@Test
fun testStateFlow() = runTest {
val stateFlow = MutableStateFlow(0)
val values = mutableListOf()
val job = launch {
stateFlow.collect { values.add(it) }
}
stateFlow.value = 1
stateFlow.value = 2
stateFlow.value = 3
job.cancel()
assertEquals(listOf(0, 1, 2, 3), values)
}
@Test
fun testSharedFlow() = runTest {
val sharedFlow = MutableSharedFlow()
val values = mutableListOf()
val job = launch {
sharedFlow.collect { values.add(it) }
}
sharedFlow.emit("First")
sharedFlow.emit("Second")
sharedFlow.emit("Third")
job.cancel()
assertEquals(listOf("First", "Second", "Third"), values)
}
}
Best Practices
Coroutine Best Practices
- Use appropriate dispatchers: Choose the right dispatcher for the job
- Handle exceptions properly: Always handle exceptions in coroutines
- Use structured concurrency: Let parent coroutines manage child coroutines
- Avoid GlobalScope: Use lifecycle-aware scopes instead
- Cancel coroutines properly: Ensure proper cleanup
Flow Best Practices
- Use StateFlow for state: Use StateFlow for UI state management
- Use SharedFlow for events: Use SharedFlow for one-time events
- Handle backpressure: Use appropriate operators for backpressure
- Collect in lifecycle-aware scopes: Use repeatOnLifecycle
- Test flows thoroughly: Write comprehensive tests for flows
Performance
- Use appropriate operators: Choose efficient operators
- Avoid unnecessary emissions: Filter out unnecessary data
- Use buffer when needed: Buffer to handle backpressure
- Profile your code: Use Android Studio profiler
Common Pitfalls
Avoiding Common Mistakes
- Don't use GlobalScope: Use lifecycle-aware scopes
- Don't ignore exceptions: Always handle exceptions
- Don't block the main thread: Use appropriate dispatchers
- Don't forget to cancel: Cancel coroutines when not needed
- Don't collect flows in wrong scope: Use lifecycle-aware collection
Debugging Tips
- Use coroutine debug mode: Enable coroutine debugging
- Add logging: Log coroutine execution
- Use Android Studio profiler: Profile coroutine performance
- Test thoroughly: Write comprehensive tests
Practice Exercises
Try these exercises to reinforce your coroutines and Flow knowledge:
Exercise 1: Weather App
// Create a weather app with:
// - Fetch weather data from API
// - Cache weather data locally
// - Real-time weather updates
// - Location-based weather
// - Weather alerts
// - Offline support
Exercise 2: Chat Application
// Build a chat app with:
// - Real-time messaging
// - Message status updates
// - Typing indicators
// - Message synchronization
// - Offline message queuing
// - Push notifications
Exercise 3: Data Sync App
// Create a data sync app with:
// - Background data synchronization
// - Conflict resolution
// - Progress tracking
// - Retry mechanisms
// - Offline queue management
// - Sync status monitoring
Next Steps
Now that you have a solid foundation in coroutines and Flow, explore these advanced topics:
- Channels: Communication between coroutines
- Actors: State management with coroutines
- Structured Concurrency: Advanced coroutine patterns
- Flow Testing: Advanced testing techniques
- Custom Flow Operators: Creating your own operators
- Performance Optimization: Optimizing coroutine performance
Resources
Summary
Kotlin Coroutines and Flow provide powerful tools for asynchronous programming in Android. Coroutines enable you to write asynchronous code that looks synchronous, while Flow offers a reactive streams API for handling data streams efficiently.
You've learned about coroutine basics, Flow operators, StateFlow and SharedFlow, integration with Android components, testing, and best practices. Remember to always consider performance, error handling, and lifecycle management when working with coroutines and Flow.
Practice regularly with different use cases, experiment with advanced features, and stay updated with the latest coroutine and Flow patterns. The more you work with these tools, the more you'll appreciate their power and flexibility for building responsive, efficient Android applications.