Effective Java Madde 48: Streamleri Paralel Yaparken Dikkatli Olun

Yaygın olarak kullanılan programlama dilleri arasında Java, paralel programlamayı kolaylaştıran araçlar sunma konusunda her zaman en önde olmuştur. 1996’da Java ilk ortaya çıktığında wait/notify mekanizması ile threadleri destekliyordu. Java 5 java.util.concurrent paketiyle beraber paralel koleksiyonlar ve executor yapısını dile eklerken, Java 7 ile fork-join mekanizmasına kavuştuk. Java 8 ise tek bir parallel metot çağrısı sayesinde paralel işletim imkanı sunan streamleri dile ekledi. Java’da paralel işletilen programlar yazmak giderek kolaylaşıyor gibi görünse de, bunu doğru ve yüksek performans alarak yapmak hiç de kolay değildir. Thread güvenliği ve canlılık (liveness) ihlalleri paralel programlamanın doğasında olan sorunlardır ve paralel streamler de bunun bir istisnası değildir.

Madde 45’de yazdığımız bu programı ele alalım:

// İlk 20 Mersenne asal sayısını üreten stream tabanlı program
public static void main(String[] args) {
    primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
        .filter(mersenne -> mersenne.isProbablePrime(50))
        .limit(20)
        .forEach(System.out::println);
}
static Stream<BigInteger> primes() {
    return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}

Bu program benim bilgisayarımda Mersenne asal sayılarını hemen yazdırmaya başlıyor ve 12.5 saniyede sonlanıyor. Peki bunu hızlandırmak niyetiyle stream hattına parallel() çağrısını eklersem ne olur? Paralel işletimi etkinleştirdiğim için program gerçekten de hızlanır mı? Maalesef bunu yaptığımda program hiçbir şey yazdırmıyor, işlemci kullanımı %90’a fırlıyor ve orada kalıyor. Belki uzun süre beklerseniz program sonlanabilir ama ben yarım saat sonra pes edip programı durdurmak zorunda kaldım.

Peki neden böyle oldu? Basitçe söylemek gerekirse, stream mekanizmasının bu işlemi paralel olarak nasıl yapabileceği konusunda hiçbir fikri yok. Stream üretmek için Stream.iterate kullanıyorsanız veya ara işlemlerden bir tanesi limit ise, stream hatlarını paralel yapmak en iyi şartlar altında dahi size performans artışı sağlamaz. Bizim örneğimizdeki stream hattı bu problemlerin ikisine de sahiptir. Paralelleştirme algoritması limit ara işlemini ele alabilmek için fazladan birkaç eleman hesaplamakta bir sakınca görmez. Daha sonra fazladan hesaplanan bu elemanlar atılacaktır. Ancak bu örnekte Mersenne asal sayıları için fazladan bir eleman hesaplamak, kabaca ondan önceki elemanların tamamını hesaplamak için harcanan zaman kadar sürer. Bu sebeple de paralel işletim algoritması çöker. Buradaki ders çok açıktır: Stream hatlarını rastgele paralelleştirmeyin!

Paralel streamlerin yüksek performansla çalışabilmesi için bunların ArrayList, HashMap, HashSet, ConcurrentHashMap nesneleri; diziler, int aralıkları (IntStream.range) veya long aralıkları (LongStream.range) üzerinden yaratılması gerekir. Bütün bu veri yapılarının ortak noktası istenen küçüklükte parçalara kolayca bölünebilmeleridir. Bu da paralel işletimi kolaylaştıran bir durumdur. Stream kütüphanesi bunu yapmak için Stream ve Iterator‘da bulunan spliterator metodunu kullanır.

Bu veri yapılarının ikinci önemli özelliği ise sıralı bir şekilde işlendikleri zaman referans yerelliği (locality of reference) sunmalarıdır. Başka bir deyişle ard arda gelen elemanların referansları bellekte beraber tutulmaktadır. Ancak referansların beraber olması bunların işaret ettiği nesnelerin de bellekte birbirlerine yakın olacağı anlamına gelmez, bu yerelliği azaltan bir faktördür. Referans yerelliği toplu yapılan işlemlerin paralel işlenmesinde çok önemli bir faktördür. Bu olmadığında threadler verinin bellekten işlemciye aktarılması için beklemek zorunda kalırlar. Referans yerelliğini en iyi sağlayan veri yapıları ise temel türlerdeki dizilerdir, çünkü bunlar verinin kendisini bellekte peş peşe saklarlar.

Bir stream hattının sonlandırıcı işlemi de paralel işletimin verimini etkiler. Hesaplamanın zaman alan kısmı ara işlemler değil de sonlandırıcı işlemde yapılıyorsa ve bu işlemin doğası gereği peş peşe yapılması gerekiyorsa paralel işletim pek verimli olmaz. Paralel işletime en uygun sonlandırıcı işlemler indirgeme (reduction) işlemleridir. İndirgeme işlemleri bütün stream elemanlarının reduce, min, max, count veya sum gibi metotlar kullanılarak birleştirilmesi sonucu tek sonuç üretirler. anyMatch, allMatch veya noneMatch gibi sonlandırıcı işlemler de paralel işletimde verimlidirler. Ancak Stream.collect metoduyla kullanılan toplayıcı işlemler pek verimli olmaz çünkü stream elemanlarının bir koleksiyonda toplanmasının getirdiği ek yük fazladır.

Stream hesaplamalarının paralelleştirilmesi kötü performansa sebep olabileceği gibi, programın yanlış sonuçlar üretmesine ve tutarsız davranmasına da sebep olabilir. Bu tür hataların kaynağı stream hatlarında kullanılan fonksiyon nesnelerinin Stream kütüphanesinin tanımladığı bağlayıcı kurallara uymamasıdır. Örneğin, reduce metoduna geçilen toplayıcı ve birleştirici fonksiyonların belirli matematiksel kuralları sağlaması ve durum taşımaması (stateless) gerekir. Bu kurallar ihlal edildiğinde (Madde 46) stream hattı sıralı işletimde düzgün çalışsa bile paralel işletimde büyük ihtimalle çökecektir.

Streamlerin verimli bir biçimde paralel işletilmesi için burada anlatılan bütün kurallara uysanız bile (doğru veri yapısı ve sonlandırıcı işlem seçimi, fonksiyon nesnelerinin paralel işletime uygun olması gibi) paralel işletimden beklediğiniz performans artışını almanız zordur. Bunun sebebi paralel işletimin kendisinin de bir ek yük getirmesidir. Eğer paralel hesaplamadan elde edilen kazanç, paralel işletimin getirdiği ek yükü fazlasıyla karşılayabiliyorsa o zaman bir performans kazanımı mümkün olur.

Bir stream hattını paralel yaparken amacın performans iyileştirmesi olduğunu unutmayın. Bu yüzden de iyileştirme yaparken öncesi ve sonrasındaki performans değerlerini ve üretilen sonuçları karşılaştırın (Madde 67). Bu testlerin gerçekçi bir sistem üzerinde yapılması önemlidir. Bütün paralel stream hatları tek bir fork-join havuzunu kullandıklarından bir tanesinde oluşabilecek hata başka stream hatlarında problemlere sebep olabilir.

Milyonlarca satırlık bir uygulama üzerinde çalışan ve stream kütüphanesini sıklıkla kullanan bir tanıdığım, paralel streamleri sadece birkaç yerde verimli olarak kullanabildiğini söylüyor. Tabii ki bu stream hesaplamalarını hiçbir koşulda paralel yapmayın anlamına gelmez! Doğru şartlar altında bir stream hattına parallel çağrısını ekleyerek işlemci çekirdeği sayısıyla orantılı olarak ciddi bir hız artışı elde edebilirsiniz. Makina öğrenmesi ve büyük çaptaki verilerin işlenmesi gibi alanlarda paralel işletimden ciddi performans kazanımları sağlanmaktadır.

Paralel işletimin verimli olduğu bir stream örneğine bakalım. Aşağıdaki primeCount(n) fonksiyonu, n değerine eşit veya daha küçük olan asal sayıların sayısını vermektedir:

// Paralel streamlerin faydalı olabileceği bir hesaplama
static long primeCount(long n) {
    return LongStream.rangeClosed(2, n)
        .mapToObj(BigInteger::valueOf)
        .filter(i -> i.isProbablePrime(50))
        .count();
}

Benim bilgisayarımda primeCount(108) değerini hesaplamak için 31 saniye geçmesi gerekti. Bu stream hattına sadece parallel() ekleyince bu süre 9.2 saniyeye düştü, yani dört çekirdekli bir işlemcide 3.7 kat hız artışı elde ettik:

// Asal sayıların sayılması - paralel versiyon
static long primeCount(long n) {
    return LongStream.rangeClosed(2, n)
       .parallel()
       .mapToObj(BigInteger::valueOf)
       .filter(i -> i.isProbablePrime(50))
       .count();
}

Eğer rastgele üretilmiş sayılardan oluşan bir stream üzerinde paralel hesaplama yapmak istiyorsanız ThreadLocalRandom yerine SplittableRandom kullanın. SplittableRandom tam olarak bu amaçla yazılmıştır ve paralel işletime çok uygundur. ThreadLocalRandom ise tek bir thread ile çalışmaya müsaittir. Paralel işletimde de çalışacaktır ancak SplittableRandom kadar hız artışı sağlamayacaktır.

Özetle, bir stream hattının doğru sonuçları üreteceğinden ve hız artışı sağlayacağından emin değilseniz paralel yapmaya kalkışmayın. Yanlış durumda yapılan paralel işletimin programın çökmesi veya performansın yerlerde sürünmesi gibi etkileri olabileceğini unutmayın. Eğer bir stream hattını paralel yaparak kazanç sağlayabileceğinizi düşünüyorsanız, gerçekçi bir ortamda mutlaka performansı ve üretilen sonuçları test edin. Sadece ve sadece bu testleri geçtiği taktirde bir stream hattını paralel yapmak yararınıza olacaktır.

Share

Effective Java Madde 47: Dönüş Türü Olarak Stream Yerine Collection Tercih Edin

Bir dizi eleman döndüren metotlarla sıkça karşılaşırız. Java 8’den önce bu tür metotlar için dönüş türü olarak ya Collection, Set, List gibi koleksiyon türleri, ya Iterable ya da dizi türleri kullanılırdı. Çoğu zaman da bunlardan birisini seçmek zor olmazdı. Metodun amacı dönüş değerinin for-each döngüsünde kullanılmasını mümkün kılmaksa veya döndürülen nesnelerin Collection arayüzündeki bazı metotları geçersiz kılması mümkün olmuyorsa, Iterable dönüş türü tercih edilirdi. Döndürülen elemanlar temel türlerde ise veya çok ciddi performans gereksinimleri varsa dizi kullanılırdı. Java 8’le streamler dile eklendikten sonra bu tür metotlarda dönüş türünü belirlemek zorlaştı.

Bir dizi eleman döndüren metotlar için dönüş türü olarak stream kullanmanın en doğru seçenek olduğunu söyleyenleri duyabilirsiniz. Ancak Madde 45’de anlattığımız gibi, iyi kod yazabilmek için stream ile yinelemeyi (iteration) beraber kullanmalıyız. Bir API sadece stream döndürecek şekilde tasarlanırsa, bunu for-each döngüsünde kullanmak isteyen bir istemci hayal kırıklığına uğrayacaktır. Stream türünün for-each döngülerinde kullanılmasını engelleyen tek şey Stream arayüzünün iterator metoduyla bu işlevselliği sağlamasına rağmen Iterable arayüzünü kalıtmamasıdır.

Maalesef bu problemin basit bir çözümü de yoktur. İlk bakışta, Stream içindeki iterator için bir metot referansı geçersek problem ortadan kalkacakmış gibi görünüyor:

// Java'da tür çıkarsama mekanizmasındaki kısıtlar nedeniyle derlenmez
for (ProcessHandle ph : ProcessHandle.allProcesses()::iterator) { 
    // döngü kodu
}

Bu kodu derlemeye çalışınca aşağıdaki gibi bir hata ile karşılaşırız:

Test.java:6: error: method reference not expected here
for (ProcessHandle ph : ProcessHandle.allProcesses()::iterator) {
                        ^

Bu hatayı düzeltmek için bir metot referansını parametreli Iterable türüne dönüştürebiliriz:

// çalışan ama çirkin bir çözüm 
for (ProcessHandle ph : (Iterable<ProcessHandle>)
                         ProcessHandle.allProcesses()::iterator)

Bu kod çalışır ama pratikte kullanmak için biraz karmaşık ve anlaşılması güç. Daha mantıklı bir seçenek olarak adaptör kullanabiliriz. JDK böyle bir adaptör sunmasa da yazması zor değildir. Dikkat ederseniz bu yöntemde tür dönüşümü yapmak zorunda değiliz çünkü tür çıkarsama mekanizması burada doğru çalışacaktır:

// Stream<E>'den Iterable<E>'ye dönüşüm yapan adaptör
public static <E> Iterable<E> iterableOf(Stream<E> stream) {
    return stream::iterator;
}

Bu adaptörü kullanarak stream döndüren metotları for-each döngülerinde kullanabiliriz:

for(ProcessHandle p : iterableOf(ProcessHandle.allProcesses())) { 
    // döngü kodu 
}

Madde 45’de Anagrams programının stream versiyonunda Files.lines, yinelemeli versiyonunda ise scanner kullanmıştık. Files.lines aslında scanner kullanmaktan daha mantıklıdır, yinelemeli versiyonda da bunu kullanmak daha doğru olurdu. Ancak yinelemeli versiyonda elemanları tek tek taramak gerektiği için stream döndüren Files.lines pek kullanışlı değildir. Stream döndüren metotlar yazmak istemcileri bu gibi tavizler vermeye zorlayabilir.

Benzer şekilde, bir grup elemanı stream kullanarak işlemek isteyen bir istemci, Iterable döndüren bir metotla karşılaşınca hayal kırıklığına uğrayacaktır. Bu problem için de yine bir adaptör kullanabiliriz:

// Iterable<E>'yi Stream<E>'ye dönüştüren adaptör
public static <E> Stream<E> streamOf(Iterable<E> iterable) {
    return StreamSupport.stream(iterable.spliterator(), false);
}

Eğer döndürdüğünüz elemanların bir stream kullanarak işleneceğinden eminseniz tabii ki stream döndürmekte bir sıkıntı yoktur. Aynı şekilde, for-each döngüsünde kullanılacağını bildiğiniz elemanları da Iterable türünde döndürmekte sakınca yoktur. Ancak dışarıya açık (public) bir API yazıyorsanız ve döndürülen elemanların ne şekilde işleneceğinden emin değilseniz, bu iki kullanımı da desteklemek yararlı olacaktır.

Collection arayüzü Iterable arayüzünü kalıtır ve stream metodu vardır, dolayısıyla hem yinelemeli hem de stream kullanımını destekler. Bu yüzden bir grup eleman döndüren dışa açık metotlar için dönüş türünün Collection veya bunun bir alt türü olması en doğrusudur. Diziler de yineleme ve stream kullanımlarını Arrays.asList ve Stream.of metotları sayesinde desteklerler. Döndürdüğünüz elemanlar belleğe kolayca sığacak kadar küçükse ArrayList veya HashSet gibi standart bir koleksiyon döndürmek mantıklı olacaktır. Ancak sırf koleksiyon türü döndürebilmek için büyük miktarlardaki veriyi belleğe sıkıştırmaya çalışmayın.

Bu gibi durumlarda kendi koleksiyon türünüzü yazmayı düşünebilirsiniz. Diyelim ki verilen bir kümenin kuvvet kümesini döndürmek istiyoruz. Kuvvet kümesi bir kümenin bütün alt kümelerini içerir. Örneğin {a, b, c} kümesinin kuvvet kümesi {{}, {a}, {b}, {c}, {a, b}, {a, c}, {b, c}, {a, b, c}} elemanlarından oluşur. Eğer kümenin n tane elemanı varsa, kuvvet kümesinde 2n eleman bulunur. Bu sebeple kuvvet kümesini standart bir koleksiyonda saklamayı düşünmeyin! AbstractList yardımıyla kendi koleksiyonumuzu yazarak bunun üstesinden gelebiliriz.

Bunu gerçekleştirebilmek için bit vektörü kullanabiliriz. Yukarıdaki gibi bir {a, b, c} kümesini örnek alırsak küme üç elemanlı olduğu için kuvvet kümesinin 23 = 8 tane elemanı olur. Yani kuvvet kümesinin bütün elemanlarını üç tane bitle ifade edebiliriz. Üç elemanlı bir bit vektöründe sıfırıncı indis “a”, birinci indis “b”, ikinci indis ise “c” elemanını temsil ederse 000 değeri boş kümeyi, 001 değeri {a} kümesini, 010 değeri {b} kümesini, 101 değeri {a, c} kümesini temsil edecektir. Başka bir deyişle bit vektörünün alabileceği sekiz değerin her biri kuvvet kümesinin bir elemanını temsil edecektir. Bir int değişkeni en fazla 231 – 1 değerini alabildiği için tek bir int ile 30 elemanlı bir kümenin kuvvet kümelerini üretebiliriz. Şimdi koda bakalım:

// Girdi olarak verilen bir kümenin kuvvet kümesini temsil eden
// bir Collection döndürür 
public class PowerSet {
    public static final <E> Collection<Set<E>> of(Set<E> s) {
        List<E> src = new ArrayList<>(s);
        if (src.size() > 30) {
            throw new IllegalArgumentException("Set too big " + s); 
        }

        return new AbstractList<Set<E>>() {
            @Override 
            public int size() {
                return 1 << src.size(); // 2 üzeri src.size()
            }
            @Override 
            public boolean contains(Object o) {
                return o instanceof Set &amp;&amp; src.containsAll((Set)o);
            }
            @Override 
            public Set<E> get(int index) {
                Set<E> result = new HashSet<>();
                for (int i = 0; index != 0; i++, index >>= 1) {
                    if ((index &amp; 1) == 1) {
                        result.add(src.get(i));
                    }
                }
                return result;
            }
        }; 
    }
}

Dikkat ederseniz PowerSet.of metodu 30’dan fazla elemanlı bir küme verildiğinde aykırı durum fırlatmaktadır. Bunun sebebi size metodunun int türünde değer döndürmesidir. Stream veya Iterable yerine Collection dönüş türü kullanmanın bir dezavantajı budur.

AbstractCollection üzerinden bir Collection gerçekleştirimi yazmak istersek Iterable arayüzünün gerektirdiği bir metot haricinde contains ve size metotlarını geçersiz kılmamız gerekir. Çoğu zaman bu metotlar kolayca geçersiz kılınabilir ancak bu mümkün olmuyorsa Stream veya Iterable döndürmek kabul edilebilir. Hatta iki ayrı metot kullanarak her ikisini de döndürebilirsiniz.

Bazen de dönüş türünü gerçekleştirim kolaylılığına göre seçebilirsiniz. Diyelim ki verilen bir listenin bütün alt listelerini (elemanların ard arda geliş sırasını bozmadan) döndüren bir metot yazmak istiyoruz. Bu alt listeleri üretip standart bir koleksiyonda tutmak için sadece üç satır kod yeterli olacaktır, ancak burada da bellek kullanımı çok fazla olacaktır. Kuvvet kümesi kadar kötü olmasa da kabul edilecek bir durum değildir. JDK bizlere iskelet bir Iterator gerçekleştirimi de sunmadığı için kuvvet kümesinde yaptığımız gibi yeni bir koleksiyon yazmak da uğraştırıcı olacaktır.

Bunu stream kullanarak yapmak ise ufak bir ipucu sayesinde nispeten kolaydır. Listenin ilk elemanını içeren alt listelere önek (prefix), son elemanını içeren listelere de sonek (suffix) adını verelim. Örneğin (a, b, c) listesi için (a), (a, b), and (a, b, c) alt listeleri önek, (a, b, c), (b, c), ve (c) alt listeleri ise sonek olacaktır. Bütün önek alt listelerinin soneklerini hesaplayıp birleştirirsek listenin bütün alt listelerini bulmuş oluruz. Bunu tersinden, yani sonek listelerinin öneklerini hesaplayarak da yapabiliriz. Tabi buna bir de boş listeyi eklemek gerekir.

NOT: Burada biraz kafanız karışmış olabilir o yüzden biraz daha detay vermek istedim. Yapılmak istenen şey bir listenin elemanlarının sırasını bozmadan bütün alt listelerini bulmak. Yani (a, b, c) listesi için alt listeler (), (a), (b), (c), (a, b), (b, c) ve (a, b, c) olacaktır, dikkat ederseniz eleman sırasını bozduğu için (a, c) alt liste olarak kabul edilmiyor. Verilen ipucu da şunu söylüyor: önek alt listleri olan (a), (a, b), and (a, b, c) listelerini alıp bunların her biri için sonekleri üretirsek bütün alt listelere erişmiş oluruz. (a) alt listesi için sonek yine (a) olacaktır. (a, b) için (b) ve (a, b) gelecektir. (a, b, c) içinse (c), (b, c) ve (a, b, c) üretilecektir. Bütün bu sonekleri birleştirip üzerinde bir de boş listeyi () eklerseniz (a, b, c) listesinin bütün alt listelerini bulmuş olursunuz.

Şimdi bunun kodu nasıl yazılıyor buna bakalım:

public class SubLists
{
    // verilen listenin bütün alt listelerini stream olarak döndürür
    public static <E> Stream<List<E>> of(List<E> list) {
        return Stream.concat(Stream.of( Collections.emptyList()),
                         prefixes(list).flatMap(SubLists::suffixes));
    }
    
    private static <E> Stream<List<E>> prefixes(List<E> list) {
        return IntStream.rangeClosed(1, list.size())
               .mapToObj(end -> list.subList(0, end));
    }

    private static <E> Stream<List<E>> suffixes(List<E> list) {
        return IntStream.range(0, list.size())
               .mapToObj(start -> list.subList(start, list.size()));
    }
}

Dikkat ederseniz Stream.concat metodu boş liste değerini de döndürülen streame eklemektedir. Ayrıca flatMap metodu (Madde 45) bütün öneklerin soneklerini içeren tek bir stream döndürmek için kullanılmaktadır. Son olarak IntStream.range ve IntStream.rangeClosed metotları önek ve sonek alt listelerini üretmek için kullanılmaktadır. Bu metotlar standart for döngülerinde int türünde indislerle yapılan tarama işlevini görmektedir. Dolayısıyla aslında yazdığımız stream tabanlı alt liste üretme kodu aşağıda iç içe for döngüleri ile yazılmış versiyonla kısmen benzerdir:

for (int start = 0; start < src.size(); start++) {
    for (int end = start + 1; end <= src.size(); end++) {
        System.out.println(src.subList(start, end));
    }
}

Bu for döngüsünü direk olarak bir streame dönüştürmek de mümkündür. Sonuçta önceki stream kodundan kısa ama belki biraz daha az anlaşılır bir kod ortaya çıkacaktır:

// verilen listenin bütün alt listelerini stream olarak döndürür
public static <E> Stream<List<E>> of(List<E> list) {
    return IntStream.range(0, list.size())
        .mapToObj(start ->
           IntStream.rangeClosed(start + 1, list.size())
              .mapToObj(end -> list.subList(start, end)))
        .flatMap(x -> x);
}

Önceki for döngüsü kodu gibi bu kod da boş listeyi dönüş değerine eklemiyor. Bunu çözmek için yine Stream.concat kullanabiliriz.

Her iki stream gerçekleştirimi de geçerlidir ve kullanılmasında bir sakınca yoktur. Ancak kullanıcılar dönüş değerini Iterable türüne dönüştürmek için adaptör kullanmak zorunda kalabilirler. Bu sadece kodu kirletmekle kalmaz aynı zamanda daha yavaş çalışmasına da sebep olur.

Özetle, bir grup eleman döndüren metotlar yazarken bazı kullanıcıların bunları stream kullanarak, bazılarının da tarama yaparak yinelemeli biçimde işlemek isteyebileceğini unutmayın. Metodun nasıl kullanılacağından emin değilseniz bu iki kullanımı da desteklemeye çalışın. Bunun için bir koleksiyon döndürebiliyorsanız döndürün. Elemanlar zaten bir koleksiyon içindeyse veya sayıları yeterince azsa ArrayList gibi standart bir koleksiyon döndürün, değilse kuvvet kümesi örneğinde yaptığımız gibi kendiniz bir koleksiyon türü tanımlayın. Collection döndürmek pek makul değilse Stream veye Iterable döndürebilirsiniz. Eğer sonraki Java versiyonlarında Stream arayüzü Iterable arayüzünü kalıtırsa stream döndürmek bir problem yaratmayacaktır çünkü bu durumda hem stream hem de yinelemeli işletimi desteklemiş olacaktır.

Share