Helpers - компьютеры, интернет, программирование

Проблемы с подключением Apache Ignite Kafka

Я пытаюсь выполнить потоковую обработку и CEP в потоке сообщений Kafka. Для этого я выбрал Apache Ignite, чтобы сначала реализовать прототип. Однако я не могу подключиться к очереди:

Используйте kafka_2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin

bin / zookeeper-server-start.sh config / zookeeper.properties bin / kafka-server-start.sh config / server.properties bin / kafka-topics.sh --create --zookeeper localhost: 2181 --replication-factor 1 - разделы 1 - тематический тест

Кафка работает исправно, тестировал на потребителе. Затем я начинаю зажигать, затем запускаю следующее в приложении командной строки весенней загрузки.

    KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

    Ignition.setClientMode(true);

    Ignite ignite = Ignition.start();

    Properties settings = new Properties();
    // Set a few key parameters
    settings.put("bootstrap.servers", "localhost:9092");
    settings.put("group.id", "test");
    settings.put("zookeeper.connect", "localhost:2181");
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    // Create an instance of StreamsConfig from the Properties instance
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");

    try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache")) {
        // allow overwriting cache data
        stmr.allowOverwrite(true);

        kafkaStreamer.setIgnite(ignite);
        kafkaStreamer.setStreamer(stmr);

        // set the topic
        kafkaStreamer.setTopic("test");

        // set the number of threads to process Kafka streams
        kafkaStreamer.setThreads(1);

        // set Kafka consumer configurations
        kafkaStreamer.setConsumerConfig(config);

        // set decoders
        StringDecoder keyDecoder = new StringDecoder(null);
        StringDecoder valueDecoder = new StringDecoder(null);

        kafkaStreamer.setKeyDecoder(keyDecoder);
        kafkaStreamer.setValueDecoder(valueDecoder);

        kafkaStreamer.start();
    } finally {
        kafkaStreamer.stop();
    }

Когда приложение запускается, я получаю

2017-02-23 10: 25: 23.409 WARN 1388 --- [main] kafka.utils.VerifiableProperties: свойство bootstrap.servers недействительно 2017-02-23 10: 25: 23.410 INFO 1388 --- [main] kafka .utils.VerifiableProperties: свойство group.id переопределяется для тестирования 2017-02-23 10: 25: 23.410 WARN 1388 --- [main] kafka.utils.VerifiableProperties: свойство key.deserializer недействительно 2017-02-23 10 : 25: 23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties: Свойство key.serializer недействительно 2017-02-23 10: 25: 23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties: Свойство value.deserializer недействителен 2017-02-23 10: 25: 23.411 WARN 1388 --- [main] kafka.utils.VerifiableProperties: Свойство value.serializer недействительно 2017-02-23 10: 25: 23.411 INFO 1388 - - [main] kafka.utils.VerifiableProperties: свойство zookeeper.connect переопределено на localhost: 2181

потом

2017-02-23 10: 25: 24.057 WARN 1388 --- [r-finder-thread] kafka.client.ClientUtils $: Получение метаданных темы с идентификатором корреляции 0 для тем [Set (test)] от брокера [BrokerEndPoint (0 , user.local, 9092)] не удалось

java.nio.channels.ClosedChannelException: null в kafka.network.BlockingChannel.send (BlockingChannel.scala: 110) ~ [kafka_2.11-0.10.0.1.jar: na] в kafka.producer.SyncProducer.liftedTree1 $ 1 (SyncProducer. scala: 80) ~ [kafka_2.11-0.10.0.1.jar: na] в kafka.producer.SyncProducer.kafka $ продюсер $ SyncProducer $$ doSend (SyncProducer.scala: 79) ~ [kafka_2.11-0.10.0.1. jar: na] в kafka.producer.SyncProducer.send (SyncProducer.scala: 124) ~ [kafka_2.11-0.10.0.1.jar: na] в kafka.client.ClientUtils $ .fetchTopicMetadata (ClientUtils.scala: 59) [ kafka_2.11-0.10.0.1.jar: na] в kafka.client.ClientUtils $ .fetchTopicMetadata (ClientUtils.scala: 94) [kafka_2.11-0.10.0.1.jar: na] в kafka.consumer.ConsumerFetcherManager $ LeaderFinderThread. doWork (ConsumerFetcherManager.scala: 66) [kafka_2.11-0.10.0.1.jar: na] в kafka.utils.ShutdownableThread.run (ShutdownableThread.scala: 63) [kafka_2.11-0.10.0.1.jar: na]

И чтение из очереди не работает. Есть у кого-нибудь идеи, как это исправить?

Изменить: если я прокомментирую содержимое блока finally, появится следующая ошибка

[2m2017-02-27 16: 42: 27.780 [0; 39m [31mERROR [0; 39m [35m29946 [0; 39m [2m --- [0; 39m] [2m [pool-3-thread-1] [0; 39m [36m [0; 39m [2m: [0; 39m] Сообщение игнорируется из-за ошибки [msg = MessageAndMetadata (test, 0, Message (magic = 1, attributes = 0, CreateTime = -1, crc = 2558126716, key = java.nio.HeapByteBuffer [pos = 0 lim = 1 cap = 79], payload = java.nio.HeapByteBuffer [pos = 0 lim = 74 cap = 74]), 15941704, kafka.serializer.StringDecoder @ 74a96647, kafka. serializer.StringDecoder @ 42849d34, -1, CreateTime)]

java.lang.IllegalStateException: стример данных закрыт. в org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.enterBusy (DataStreamerImpl.java:401) ~ [ignite-core-1.8.0.jar: 1.8.0] в org.apache.ignite.internal.processors. datastreamer.DataStreamerImpl.addDataInternal (DataStreamerImpl.java:613) ~ [ignite-core-1.8.0.jar: 1.8.0] в org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData (DataStreamer667Impl.java ) ~ [ignite-core-1.8.0.jar: 1.8.0] в org.apache.ignite.stream.kafka.KafkaStreamer $ 1.run (KafkaStreamer.java:180) ~ [ignite-kafka-1.8.0.jar : 1.8.0] в java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) [na: 1.8.0_111] в java.util.concurrent.FutureTask.run (FutureTask.java:266) [na : 1.8.0_111] на java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) [na: 1.8.0_111] на java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. : 1.8.0_111] в java.lang.Thread.run (Thread.java:745) [na: 1.8.0_111]

Спасибо!

23.02.2017

Ответы:


1

Я думаю, это происходит потому, что KafkaStreamer закрывается сразу после запуска (вызов kafkaStreamer.stop() в блоке finally). kafkaStreamer.start() не является синхронным, он просто раскручивает потоки для использования из Kafka и завершает работу.

23.02.2017
  • Спасибо за ответ, если я прокомментирую содержимое блока finally, я получаю сообщение об ошибке, которое я опубликовал выше (в редактировании) 27.02.2017
  • Это потому, что вы тоже закрываете IgniteDataStreamer. Избавьтесь от блока try-with-resources, и он сработает. 27.02.2017
  • Привет, я еще не запустил приложение (потому что я не уверен, как читать из кеша), но, по крайней мере, я больше не получаю ошибок. Я отмечу этот вопрос как ответ и открою новый для остальных. Еще раз спасибо, и, возможно, вы также сможете посмотреть stackoverflow.com/questions/42562766/ 02.03.2017
  • Новые материалы

    Интуитивное понимание тензоров в машинном обучении
    Тензор является важной концепцией во многих научных областях, таких как математика, физика, обработка сигналов и компьютерное зрение, и это лишь некоторые из них. В математике тензор — это..

    Использование машинного обучения для диагностики болезни Альцгеймера, часть 4
    Маркеры семантической согласованности для ранней диагностики болезни Альцгеймера (arXiv) Автор: Давиде Колла , Маттео Дельсанто , Марко Агосто , Бенедетто Витиелло , Даниэле Паоло Радичони..

    Почему объяснимость так важна прямо сейчас?
    По мере того, как системы искусственного интеллекта и инструменты на основе машинного обучения распространяются в нашей повседневной жизни, как практики, так и критики все чаще заявляют о..

    Анимированный математический анализ
    Использование Manim для создания математических анимированных визуализаций Визуализация данных помогает понять скрытые закономерности в данных, которые невозможно визуализировать..

    Создание простого слайдера изображений с помощью JavaScript
    Узнайте, как создать базовый слайдер изображений с помощью HTML, CSS и JavaScript. Введение В этом уроке мы создадим удобный слайдер изображений, используя JavaScript, HTML и CSS. Ползунок..

    Создание базы данных с помощью супергероя «Python»
    В этом посте мы узнаем, как создать «базу данных SQLite с помощью модуля python sqlite3, создав простую функцию входа и регистрации. Готовы ли вы к этому путешествию? Если да , давайте приступим..

    ИИ для чайников: руководство для начинающих по пониманию будущего технологий
    Вы чувствуете, что остались позади в мире ИИ? Не волнуйтесь, вы не одиноки! Со всей этой шумихой вокруг искусственного интеллекта может быть трудно понять, с чего начать. Но не позволяйте сленгу..