Helpers - компьютеры, интернет, программирование

Java - ExecutorService имеет максимальный размер

Есть ли способ просмотреть огромную базу данных и параллельно применить несколько заданий для скамейки записей? Я пробовал с ExecutorService, но нам нужно выключить(), чтобы узнать размер пула...

Итак, мое лучшее решение:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest) 
{
    return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}

public static void main(String args[]) throws Exception
{
    int dbOffset = 0;
    int nbOfArticlesPerRequest = 100;
    int MYTHREADS = 10;
    int loopIndex = 0;
    boolean bContinue=true;
    Runnable worker;



    while(bContinue) // in this loop we'll constantly fill the pool list
    {
        loopIndex++;
        ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...

        List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
        for(String id: ids) {
            worker = new MyRunnable(id);
            executor.execute(worker);
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
            System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                    " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
            );
            TimeUnit.MILLISECONDS.sleep(500);
        }

        if(loopIndex>=3) {
            System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
            bContinue = false;
        }
        dbOffset+=nbOfArticlesPerRequest;
    }
}



public static class MyRunnable implements Runnable {

    private final String id;

    MyRunnable(String id) {
        this.id = id;
    }

        @Override
        public void run()
        {
            System.out.println("Thread '"+id+"' started");
            try {
                TimeUnit.MILLISECONDS.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Thread '"+id+"' stopped");
        }
    }
}

Это работает нормально, но недостатком является то, что на каждом конце цикла мне нужно ждать завершения последних потоков.

например: когда работают только 3 потока...

Я сделал следующее, чтобы решить эту проблему, но это «безопасно»/правильно?

Кстати: есть ли способ узнать, сколько задач/потоков находится в очереди?

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executor.execute(worker);
             }

            while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
                System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                        " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
                );
                TimeUnit.MILLISECONDS.sleep(500);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executor.shutdown();
    // Wait until all threads are finish
    while (!executor.isTerminated()) {
        System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
                " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }

ИЗМЕНИТЬ:

Я пытаюсь запустить 1 или 10 миллионов задач, поэтому (я полагаю) я не могу поставить их все в очередь... Вот почему я использую глобальный исполнитель, чтобы иметь возможность всегда иметь несколько потоков в очереди (для что я не могу отключить исполнителя, иначе его больше нельзя использовать).

Последняя версия кода:

    int dbOffset = 0;
    int nbOfArticlesPerRequest = 5; //100;
    int MYTHREADS = 2;
    int loopIndex = 0;

    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
       while(bContinue) // in this loop we'll constantly fill the pool list
        {
            loopIndex++;

            List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
             for(String id: ids) {
                    worker = new MyRunnable(id);
                    executorPool.execute(worker);
             }

            while (executorPool.getActiveCount() >= MYTHREADS  || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2)) 
            {
                System.out.println("Pool size is now " + executorPool.getActiveCount()+
                                        " - queue size: "+ executorPool.getQueue().size()
                );

                if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
                    System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
                    break;
                }

                TimeUnit.MILLISECONDS.sleep(2000);
            }

            if(loopIndex>=3) {
                System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
                bContinue = false;
            }
            dbOffset+=nbOfArticlesPerRequest;
        }

    executorPool.shutdown();
    // Wait until all threads are finish
    while (!executorPool.isTerminated()) {
        System.out.println("Pool size is now " + executorPool.getActiveCount()+
                " - queue size: "+ executorPool.getQueue().size()
        );
        TimeUnit.MILLISECONDS.sleep(500);
    }

заранее спасибо

29.06.2017


Ответы:


1

Обновить

Теперь мне ясно, что ваша основная проблема в том, что вы не можете отправить 10 миллионов задач одновременно.

Не бойтесь, вы можете отправить их все в исполнитель. Фактическое количество задач, выполняемых параллельно, ограничено размером базового пула потоков. То есть, если у вас размер пула 2, в данный момент выполняются только две задачи, остальные сидят в очереди и ждут свободного потока.

По умолчанию Executors.newFixedThreadPool() создает Executor с очередью размером Integer.MAX_VALUE, поэтому там поместятся ваши миллионы задач.


Вы можете использовать метод ExecutorService.submit(), который возвращает Future. Затем вы можете проверить состояние ваших будущих задач (например, с помощью методов isDone(), isCancelled()).

Executor обычно является чем-то, что вы не хотите отключать явно, и существует на протяжении всего жизненного цикла вашего приложения. При таком подходе вам не нужно выключаться, чтобы узнать, сколько задач ожидает выполнения.

List<Future<?>> tasks = new ArrayList<>();
for (String id : ids) {
    Future<?> task = executorService.submit(() -> {
        // do work
    });
    tasks.add(task);
}

long pending = tasks.stream().filter(future -> !future.isDone()).count();
System.out.println(pending + " task are still pending");

Кроме того, обратите внимание, что задачи и темы не являются взаимозаменяемыми терминами. В вашем случае исполнитель имеет фиксированное количество потоков. Вы можете отправить больше задач, чем это, но остальные будут находиться в очереди исполнителя, пока не появится свободный поток для выполнения задачи.

29.06.2017
  • Может быть хорошей идеей... Тогда мне просто нужно добавить цикл ожидания, чтобы назначить больше задач, когда выполняется меньше X задач... 11.07.2017
  • тот же вопрос, что и для @Pavan, какая польза от вашего решения по сравнению с моим (см. Последний код после EDIT)? 11.07.2017
  • Ну, теперь я понял вашу точку зрения и обновил ответ. Я думаю, вы прикладываете значительные усилия для имитации того, что Исполнители уже предоставляют — очереди. 11.07.2017
  • Спасибо, Дэвид, но, отправив 10 миллионов задач, не будет ли объект Executor огромным (т.е. и займет много оперативной памяти)? 12.07.2017
  • Что бы ты не пошел и не попробовал сам? Я только что быстро выполнил 10 миллионов задач, выполняя System.out, объем потребляемой памяти составил 800 МБ... 13.07.2017
  • Конечно, да, и ваше обновление показало мне, что очередь способна справиться с таким большим количеством задач! Спасибо! 17.07.2017

  • 2

    ExecuterService позволяет вам вызывать список задач, которые могут выполняться параллельно, и возвращать результат, когда он доступен.

    В вашем коде вы используете

    worker = new MyRunnable(id);
    executor.execute(worker);
    

    Вместо Runnable в этом случае лучше использовать Callable, тогда вы можете отправить список вызываемых объектов для выполнения в одном API вместо цикла for.

    List<Callable> workers = new ArrayList<>();
    workers.add(new MyCallable(id)) // this is just for example
    workers.add(new MyCallable(id))
    workers.add(new MyCallable(id))
    
    List<Future<Boolean>> futures = executor.invokeAll(workers); // this will execute all worker tasks parallely and return you future object list using which you can see whether worker thread is completed or not and also the what is the return value.
    

    Обратите внимание, что метод get объекта Future блокирует вызов

    11.07.2017
  • invokeAll также блокируется, поэтому первоначальная проблема (необходимость ожидания последнего потока в каждом цикле) не решена. :) 11.07.2017
  • @Bast - Насколько я понимаю, invokeAll не блокирует вызов. документы .oracle.com/javase/7/docs/api/java/util/concurrent/ 11.07.2017
  • Вы правы, только блокирует future.get()... Вероятно, я мог бы использовать ваше решение, предложенное @DavidSiro... 11.07.2017
  • Я, пожалуйста, попробуйте и оставьте свой отзыв, так как это может помочь и другим. Спасибо 11.07.2017
  • какая польза от вашего решения по сравнению с моим (см. последний код после EDIT)? 11.07.2017
  • Я чувствую, что ваши предположения неверны. Когда вы создаете executorservice с некоторым числом пулов чтения, он будет использовать эти потоки для выполнения ваших задач. И когда вы говорите о завершении работы executorservice, это не означает, что вы закрываете текущие исполняемые потоки. Просто служба-исполнитель больше не может брать задачи. 12.07.2017
  • Давайте продолжим обсуждение в чате. 12.07.2017

  • 3

    Вам не нужно знать размер пула потоков, чтобы проверить выполнение задач в ExecutorService. Вы можете удалить свой код после отправки задачи.

    Вариант 1:

    1. #P3# <блочная цитата> #P4# #P5#
      ExecutorService executor = Executors.newWorkStealingPool();
      
    2. Используйте invokeAll

    Вариант 2: (полезно, если вы заранее знаете количество задач)

    Используйте CountDownLatch и инициализируйте счетчик к количеству задач, которые должны быть отправлены.

    Дополнительные ссылки:

    ждите, пока все потоки закончат свою работу в java< /а>

    Как правильно отключить java ExecutorService

    11.07.2017
  • да, но поскольку это цикл while, я хотел динамически добавлять новые потоки, чтобы всегда иметь некоторые в очереди... на самом деле использование getActiveCount() более правильно (код обновлен) - теперь я фактически даже переключился на ThreadPoolExecutor в моем локальном код 11.07.2017
  • Обратите внимание, что во второй части моего кода (т.е. в решении) ExecutorService является глобальным, поэтому его нельзя отключить, иначе его больше нельзя использовать. 11.07.2017
  • Вне цикла while вы можете сохранить код выключения, используя API-интерфейсы shutdown, sbutdownNow, awaitTermination в последовательности, указанной в сообщении выше. 11.07.2017
  • Да, в моем коде функция shutdown() уже вне цикла... проблема в том, что мне нужно запустить 1 или 10 миллионов задач, поэтому (я полагаю) я не могу поставить их все в очередь ... И я подумал, что использование глобального исполнителя может быть решением, чтобы всегда иметь несколько потоков в очереди... Я отредактирую свой пост и добавлю последний код, который я запускаю. 11.07.2017
  • используйте invokeAll() и замените ThreadPoolExecutor на newWorkStealingPool 12.07.2017
  • Новые материалы

    Интуитивное понимание тензоров в машинном обучении
    Тензор является важной концепцией во многих научных областях, таких как математика, физика, обработка сигналов и компьютерное зрение, и это лишь некоторые из них. В математике тензор — это..

    Использование машинного обучения для диагностики болезни Альцгеймера, часть 4
    Маркеры семантической согласованности для ранней диагностики болезни Альцгеймера (arXiv) Автор: Давиде Колла , Маттео Дельсанто , Марко Агосто , Бенедетто Витиелло , Даниэле Паоло Радичони..

    Почему объяснимость так важна прямо сейчас?
    По мере того, как системы искусственного интеллекта и инструменты на основе машинного обучения распространяются в нашей повседневной жизни, как практики, так и критики все чаще заявляют о..

    Анимированный математический анализ
    Использование Manim для создания математических анимированных визуализаций Визуализация данных помогает понять скрытые закономерности в данных, которые невозможно визуализировать..

    Создание простого слайдера изображений с помощью JavaScript
    Узнайте, как создать базовый слайдер изображений с помощью HTML, CSS и JavaScript. Введение В этом уроке мы создадим удобный слайдер изображений, используя JavaScript, HTML и CSS. Ползунок..

    Создание базы данных с помощью супергероя «Python»
    В этом посте мы узнаем, как создать «базу данных SQLite с помощью модуля python sqlite3, создав простую функцию входа и регистрации. Готовы ли вы к этому путешествию? Если да , давайте приступим..

    ИИ для чайников: руководство для начинающих по пониманию будущего технологий
    Вы чувствуете, что остались позади в мире ИИ? Не волнуйтесь, вы не одиноки! Со всей этой шумихой вокруг искусственного интеллекта может быть трудно понять, с чего начать. Но не позволяйте сленгу..