Kotlin Flow ve Saz Arkadaşları — Part 2

Murat YÜKSEKTEPE
3 min readMar 22, 2024

Kotlin Flow için kullanabileceğimiz metodları tanımaya devam ediyoruz. Bu makale şu alt başlıkları içeriyor;

Flow.buffer(), Flow.catch() ve callbackFlow (biraz da backpressure)

Flow.buffer()

Buffer’ı daha iyi anlamak için önce backpressure kavramına göz atmak gerekir. Backpressure kısaca, verilerin tüketilmesinden daha hızlı üretilmesi demektir.

Bir üretici her yarım saniyede bir yeni bir veri üretmiş ve yayına vermiş olsun. Bu verileri karşılayan tüketici ise her veri için bir saniye zaman harcasın.

Bu akışı collect ile dinlediğimiz zaman -bildiğimiz üzere- lambda içerisindeki işlemler bitmeden diğer veri için olan işlemlere geçiş olmayacaktır fakat bu sırada veri hızlı şekilde üretilmeye devam etmektedir. İşte backpressure tam olarak bunu ifade eder.

Bildiğimiz üzere normalde flow ardışık bir yapıdadır. Yani üretilmiş olan bir değer yayına girdikten sonra tüketim için hazırlanmış işlemlere maruz kalır ve tüketim işlemi bittikten sonra yeni değer akışa girerek aynı aşamalar gerçekleşir.

Yani kısaca yapmak istediğimiz şey bir sonraki işleme geçmeden önce var olan işlemin tamamlanmasını beklemek ise -yani elimizdeki iş bitmeden üreticinin yeni veri göndermesini engellemek istiyorsak- aşağıdaki örnekte olduğu gibi işlemler arasına bir tampon eklemeden devam edebiliriz.

Not: Kotlin Playground bazen yanlış çıktılar gösterdiği için olması gereken çıktıları kodların altına ekliyorum.

Flow için tamponsuz Collect işlemi
Üretici gönderdi: 1
Tüketici aldı: 1
(1 sn)
Üretici gönderdi: 2
Tüketici aldı: 2
(1 sn)
Üretici gönderdi: 3
Tüketici aldı: 3

Fakat bir kuyruklama yapısı kurmak ve tüketim işlemine başlamadan önce üreticinin işini bitirmesini beklemek istiyorsak aşağıdaki örnekte olduğu gibi üretim ve tüketim aşamalarını bir tampon ile ayırabiliriz.

Flow.buffer() — Üretim ve tüketim için delay kullanılmadı.
Üretici gönderdi: 1
Üretici gönderdi: 2
Üretici gönderdi: 3
Tüketici aldı: 1
Tüketici aldı: 2
Tüketici aldı: 3
  • Üretim hızlı, tüketim yavaş ise: üretici veriyi göndermeye devam ederken tüketici akışı sıraya almış ve elindeki işi bitirmeye çalışıyor olacaktır.
Üretici gönderdi: 1
Tüketici aldı: 1
Üretici gönderdi: 2
Üretici gönderdi: 3
Tüketici aldı: 2
Tüketici aldı: 3
  • Üretim yavaş, tüketim hızlı ise: tampon özelliği ortadan kalkacaktır.
Üretici gönderdi: 1
Tüketici aldı: 1
Üretici gönderdi: 2
Tüketici aldı: 2
Üretici gönderdi: 3
Tüketici aldı: 3

Kaynak ve detaylı okuma için:

Buffer: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html

Backpressure: https://medium.com/google-developer-experts/backpressure-in-your-kotlin-flows-3eec980869c7

https://blog.stackademic.com/understanding-backpressure-and-buffer-in-kotlin-coroutines-flow-3f59e41c76f9 (Örneğinin güzel olması açısından özellikle bu makaleyi okumanızı öneririm)

callbackFlow()

Bazı durumlarda veri akışlarını callback’ler aracılığıyla, çoğunlukla bir servise register olarak dinleriz. Örneğin; LocationManager kullanarak anlık lokasyon değişikliklerinin callback aracılığıyla takip edilmesi gibi.

callbackFlow() metodu bu gibi durumlar için callback’ler ile dinlediğimiz ve sürekliliği olan veri akışlarını flow şeklinde kullanabilmemizi sağlar.

Örnek kod ilk bakışta birazcık karışık gelebilir ama vakit ayırıp incelediğini zaman akışı kısa sürede çözeceğinizi düşünüyorum.

callbackFlow — Hata mesajı içeren örnek
1
2
3
4
Exception in thread "main" java.util.concurrent.CancellationException: Hata! Liste içerisinden 4'ü çıkarıp yeniden deneyiniz.
at FileKt$main$2$callbackObject$1.onError (File.kt:26)
at FileKt$main$2.invokeSuspend (File.kt:39)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith (ContinuationImpl.kt:33)

Örneğimizde veri üretilirken bir Exception oluşması durumu özellikle simüle edilmiştir.

Flow.catch()

Yukarıdaki örnekte gösterdiğimiz Exception detayından sonra catch()’i incelememiz iyi olacaktır :)

Bir flow akışı içerisinde eğer bir Exception ile karşılaşırsak ve bunu doğru şekilde yönetemezsek uygulama akışımız istemediğimiz şekilde kesilecektir.

Eğer exception alınması durumunda uygulama akışımız yarıda kesilmeden bu durumu yönetmek istersek Flow.catch lambdasını kullanarak Exception’ı yakalayabilir ve uygun şekilde yönetebiliriz.

Flow.catch { … }

Not: Bir Exception ile karşılaşmamız durumunda catch { .. } lambdası çalışacak ve akışımız iptal olacaktır. Yukarıdaki örnekte 4 ve 5 değerlerini çıktı olarak göremeyiz.

--

--