СоХабр закрыт.

С 13.05.2019 изменения постов больше не отслеживаются, и новые посты не сохраняются.

| сохранено

H Бесконечные потоки с помощью Observable и их применение в Android проектах в черновиках Из песочницы

Доброго времени суток, друзья. Хотел бы поделиться своим не большим велосипедом открытием в области бесконечных потоков с помощью Observable, применении их в android проектах, а так же немного рассказать теорию (если не уснете к концу статьи).

И так как же создать бесконечность? Возьмем и реализуем корекурсию с помощью нашего ленивца Observable. Почему же мы не можем применить рекурсию, ведь она в теории может быть тоже бесконечна? Потому что в теории это конечно — хорошо, но мы то программисты живем в реальном мире, а в нем мы вычисления никогда не завершим, и такая бесконечность нам не нужна.

Позвольте привести примеры c помощью «андройдовской» Java, т.к мы будем применять все это дело, после не большой модернизации, в реальном проекте:

1) Пример с рекурсией или как не нужно делать:

public Observable<BigInteger> getState(Context context) {
        BigInteger i = ZERO;
        return Observable.create(
                subscriber -> {
                        while (true) subscriber.onNext(i);
                                i = i.add(ONE); 
                }
        );
    }

2) Пример с отпиской или как нужно делать:

public Observable<Boolean> getNetworkState(Context context) {
        BigInteger i = ZERO;
        return Observable.create(
                subscriber -> {
                    Runnable r = () -> {
                        while (!subscriber.isUnsubscribed())
                            while (true) subscriber.onNext(i);
                            i = i.add(ONE); 
                    };
                    new Thread(r).start();
                }
        );
    }

А теперь подробнее — что же происходит в первом и втором случае? В первом случае, рекурсия блокирует поток, по скольку вычисления никогда не кончится, и вы сами, наверняка, догадываетесь к чему это приведет. Поэтому, создадим явную конкурентность. Во втором примере мы создаем отдельный поток и будем делать события в нем.

И самое интересное то, что так как subscribe() всего лишь навсего создает новый поток, мы можем этот поток завершить всего лишь отписавшись от события. Очень удобно и без всяких interrup'ов.

Так как мы будем работать в отдельном потоке, то мы без потери производительности можем в реальном времени отслеживать различные состояния. И потом — так же легко взаимодействовать с UI через Handler.

Как же нам теперь все это применить в реальном проекте? Очень просто. Сейчас я покажу на актуальном, на мой взгляд примере — обнаружение сети.

Для начала — создадим класс, с нашим публичным Observable, на который мы можем подписаться, и приватным методом, который проверяет состояние сети. Вот код с документацией:

public class NetworkState {
    /**
     * @param context контекст в котором должен работать метод (<i>в основном
     это будет {@link android.app.Activity}</i>)
     * @return {@link Observable<Boolean>} Мгновенно возвращает состояние сети:
     * <br>
     *      <b>true</b> - сеть есть
     * </br>
     *
     * <br>
     *      <b>false</b> - сети нету
     * </br>
     */
    public Observable<Boolean> getNetworkState(Context context) {
        return Observable.create(
                subscriber -> {
                    Runnable r = () -> {
                        while (!subscriber.isUnsubscribed())
                            subscriber.onNext(hasConnection(context));
                    };
                    new Thread(r).start();
                }
        );
    }

    /**
     * Проверка на наличие сети
     * @param context Контекст
     * @return <b>true</b> - сеть есть,  <b>false</b> - сети нет
      */
    private static boolean hasConnection(final Context context) {
        ConnectivityManager connectivityManager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);

        NetworkInfo wifiInfo = connectivityManager.getNetworkInfo(ConnectivityManager.TYPE_WIFI);
        NetworkInfo mobileInfo = connectivityManager.getNetworkInfo(ConnectivityManager.TYPE_MOBILE);

        return wifiInfo != null && wifiInfo.isConnected() || mobileInfo != null && mobileInfo.isConnected();
    }
}

Теперь, собственно, применение (Не забудьте где нибудь проинициализировать вьюшки):

/**
* Собственно тут все очевидно, дизейблим некую кнопку и показываем сообщение в TextView, о том, что нету сети 
**/
private void rxNetworkCheck(){
        new NetworkManager().getNetworkState(this).subscribe(x ->
                handlerNetwork.post(() -> {
                    if(x){
                        buttonNext.setVisibility(View.VISIBLE);
                        textViewNoConnection.setVisibility(View.INVISIBLE);
                    }
                    else{
                        buttonNext.setVisibility(View.INVISIBLE);
                        textViewNoConnection.setVisibility(View.VISIBLE);
                    }
                })
        );
    }

Подведем итоги: бесконечные потоки — вполне реальны, они применимы в реальных проектах, они очень удобны и отзывчивы. При тестировании данной концепции в своем проекте я не заметил ни каких «проседаний» в производительности или других артефактов. Конечно же этот способ актуален не только для обнаружении сети, тут можно не ограничивать себя в фантазии.

Вот, собственно, и все, что я хотел сказать, надеюсь своим постом принес кому-то пользу либо проявил интерес к данной теме.

Ну и конечно же книга которая помогла мне и продолжает помогать в изучении реактивного подхода к разработки, это — «Реактивное программирование с применением RxJava» от Томаш Нуркевича и Бена Кристенсена.
–7
~1900

комментарии (35)

0
ulman95 ,  
Зря конечно слово «велосипед» зачеркнули, к примерам кода отлично подходит. В первоиздании
0
ulman95 ,  
… в первоиздании запуск потока внутри create() не являлся эталонным решением.
0
AlexeyVD ,  
Во-первых, в примерах 1) и 2) у вас приведен какой-то кривой нерабочий код с бесконечными циклами.

Во-вторых, раз уж статья называется не «Бесконечные потоки с помощью Runnable и их применение в Android проектах», а идет речь о RX, то можно было бы сказать пару слов о backpressure и Flowable. Т.к. исполюзую ваш подход, разработчики рано или поздно словят MissingBackpressureException.
–2
Implozia ,  
И первый и второй код рабочий, я проверял, второй прекрасно живет в проекте, конечно можно было бы и приостанавливать потоки на какое-то время или сделать более гибко с проверками, но в моем проекте была необходимость чекать сеть постоянно, а в пример я поставил именно этот кусок, как основу, от чего можно оттолкнуться, статья только о том что можно делать, к примеру — так, у меня не было цели создать кучу кейсов под все случаи жизни
+1
AlexeyVD ,  
Вот тут бесконечный цикл. i = i.add(ONE) никогда не вызовется:
                subscriber -> {
                        while (true) subscriber.onNext(i);
                        i = i.add(ONE); 
                }

Тут тоже самое. Второй while зацыклен, !subscriber.isUnsubscribed() будет проверено только 1 раз при входе в первый цикл, i = i.add(ONE) никогда не вызовется:
                subscriber -> {
                    Runnable r = () -> {
                        while (!subscriber.isUnsubscribed())
                            while (true) subscriber.onNext(i);
                        i = i.add(ONE); 
                    };
                    new Thread(r).start();
                }

Ну и для периодических запросов можно использовать Observer.interval(), а переключать потоки выполнения через .observeOn()
–3
Implozia ,  
Первый пример и написан как не правильный и как не нужно делать. Чувствую мне все таки придется сделать тестовую приложуху и все разжевать более детально. Опять же моя статья не претендует на панацею, моя основная мысль показать, обратить внимание, однако от коментов типа — «не рабочий код» у меня не хило так бомбануло, и, все таки я объясню все более подробнее и свою мысль с пруфами, пожалуйста, подождите немного)
0
virtustilus ,  
Во втором примере, как и в первом зацикливается навсегда и остановить его невозможно.
Потому что:
while (true) subscriber.onNext(i); — никогда не остановится, кроме случая возникновения исключения.
0
Implozia ,   * (был изменён)
Он остановится, если от этого Observable отписаться
0
mayorovp ,  
Каким образом?
0
Implozia ,   * (был изменён)
unsubscribe()
0
mayorovp ,  
И каким образом этот вызов прервет цикл?
0
Implozia ,  
Там же условие цикла — while(!subscriber.isUnsubscribed()), вроде логично что он таким образом прекращает свою работу, можете сами проверить
0
mayorovp ,   * (был изменён)

Где вы в цикле while (true) subscriber.onNext(i); видите проверку !subscriber.isUnsubscribed()? Я вижу true.

0
mayorovp ,  

На всякий случай: вот та самая ошибка, за которую вам третий день ставят минусы:



Вот так хорошо бесконечный цикл видно или надо линию жирнее рисовать?

0
Implozia ,   * (был изменён)
Во первых — вложенный цикл прервется тоже. Во вторых это даже не важно, потому что Observable — ленивый тип и если подписчиков на него не будет, то ничего не произойдет. Observable работает только при подписке
0
mayorovp ,  
Каким образом прервется вложенный цикл?
–1
Implozia ,  
Давайте я вам все подробнее на пруфах покажу, как только домой приду? Потому что так, думаю, вопросы не кончатся. Но пока что просто скажу — таким образом, что некому будет эти события производить при отписке
0
virtustilus ,  
Возможно это еще дополнительно связано с плохим тоном написания кода (хотя некоторые все-таки за минимализм).
В общем Ваш код можно переписать вот так:

   while (!subscriber.isUnsubscribed()) {
             while (true) {
                      subscriber.onNext(i);
             }
   }
   
   i = i.add(ONE); 


Здесь нужно понимать, что проверка subscriber.isUnsubscribed() выполнится один раз в момент запуска этого кода. И если подписчик подписан (что должно быть), то следующим шагом запустится второй цикл, который будет выполняться, пока у него в условии true, а там true, то есть выхода из цикла не будет никогда.
0
anegin ,  

Условие во внешнем цикле будет проверено только один раз. внутренний цикл будет выполняться бесконечно.
Чтобы работало так, как вы задумали, нужно, как минимум, убрать внутренний while:


while (!subscriber.isUnsubscribed())
    subscriber.onNext(i);

К тому же не помешало бы добавить Thread.sleep() внутрь цикла, чтобы дать процессору возможность отвлечься на другие задачи.
В любом случае, это не отменяет того факта, что данный подход изначально bad style

+1
Ztare ,  
А поток, который в таком режиме как в примере работает, не загрузит ядро процессора на 100% (опрос состояния + постоянное обращение к UI)? RxJava добавит какие-то свои задержки\ожидания?
–5
Implozia ,  
Нет не загрузит, если наберутся лайки к твоему комменту, я сделаю тестовый проект на гите, с этим примером, и напишу конкретную статью о нем со всеми графиками и пояснениями)
+3
a15199732 ,  
КГ/АМ Не совсем удачная статья.
1. Observable.create() deprecated
2. MissingBackpressureException неизбежен
3. Чтобы выполнить Observable в отдельном потоке, есть SubscribeOn()
4. Практически любая задача превращения одиночных событий в поток лучше всего решается с помощью Subject
0
Goshik ,  
Observable.create() deprecated

Уже нет, начиная с Rx 2.0. Пруф:
Each reactive base type Flowable, Observable, Single, Maybe and Completable feature a safe create operator that does the right thing regarding backpressure (for Flowable) and cancellation (all)
–4
Implozia ,   * (был изменён)
1) Это применимо для RxJava2, а я писал на первой
2) Избежен
3) Можно, конечно, кто Вам не дает?)
4) Блин, я написал об идее, а не о том как Вы будете ее реализовывать)
+2
Tagakov ,  
Это статья с очень вредными советами, никогда так не делайте! Если вы новичок и пытаетесь освоить Rx — закройте эту статью и поищите другую.
+1
anegin ,  
Очень неудачный пример того, что обычно решается стандартными средствами (BroadcastReceiver). Еже(милли)секундный апдейт UI там, где он должен изменятся только по факту пропадания/появления коннекта — это вообще нечто.
0
artemgapchenko ,  
Вначале:
Пример с отпиской или как нужно делать
return Observable.create(
                subscriber -> {
                    Runnable r = () -> {
                        while (!subscriber.isUnsubscribed())
                            while (true) subscriber.onNext(i);
                            i = i.add(ONE); 
                    };
                    new Thread(r).start();
                }
        );


И в конце:
Ну и конечно же книга которая помогла мне и продолжает помогать в изучении реактивного подхода к разработки, это — «Реактивное программирование с применением RxJava» от Томаш Нуркевича и Бена Кристенсена.

Блин, ну как вы так читали-то, там же объясняется, что 1) Создавать observables через create() — дурная затея, так как почти все (и вы в том числе) забывают про обработку backpressure; 2) Запускать треды внутри Observable.create() — дурно пахнущий код, для работы с потоками subscribeOn()/observeOn() есть.
–1
Implozia ,  
subscrubeOn() На сколько я знаю очень удобен для использования пула потоков, а в моем примере всего один маленький тредик, в подобных примерах и авторы книги применяют такой подход
0
gildor ,  

Чем же subscrubeOn() не удобен при использовании одного потока? Это стандартный механизм и использование голого треда тут ничем не оправдано

0
mayorovp ,  
Конкретно в данном случае он неудобен тем что занимает на неопределенное время один поток из пула. Стандартные пулы потоков попросту не для такого сценария использования делались.
0
gildor ,  

Так можно создать свой Scheduler из одного потока, который ничего из пула красть не будет, как и io Schedulers.io(), который не имеет ограничения на число потоков и просто создаст новый в случае необходимости.
Ну я с вами соглашусь, что RxJava точно не для такого делалась, как пример в статье

+1
mayorovp ,  
Из одного потока пул создавать нельзя, потому что подписчиков может оказаться два — и тогда потребуется два потока.

Да, я почему-то думал что `subscribeOn` принимает `ExecutorService` а не `rx.Scheduler`. Конечно, `Schedulers.io()` использовать можно. Хотя мне для такого странного кода больше нравится `Schedulers.newThread()` :-)
+1
artemgapchenko ,  
Всего один маленький тредик, которой можно было бы и не создавать, если взять его из уже созданного и правильным образом сконфигурированного пула потоков.
–3
Implozia ,   * (был изменён)
Ребят, я всех услышал, вот здесь я максимально подробно написал, что я пытался донести до вас этой статьей: habrahabr.ru/post/338372
Возможно мы друг друга не поняли, если и тут я не прав, напишите, приму к сведению в дальнейшем, и буду планировать следующую статью, если, конечно будет что написать, максимально подробно. Прошу прощения, если я в ней не прав, но пригорело у меня знатно
0
gildor ,  

Тут уже упомянался Observable.create() и то что это дурной тон и контр аргумент, что он не задепрекейчен больше и является рекомендованы.
Хотел просто внести ясность, что есть deprecated Observable.create(), его не deprecated версия называется Observable.unsafeCreate().
Рекомендованый билдер так же называется Observable.create(), но использует совершенно другую сигнатуру и является просто переименованой версией того, что раньше называлось Observable.fromEmitter() и в данный момент deprecated в rxjava 1.x