Helpers - компьютеры, интернет, программирование

Тупик с использованием Aggregator + Redis

Этот пост связан с этим

Взаимоблокировка интеграции Spring с использованием Aggregator + MessageStoreReaper + Redis?

но это сообщение слишком длинное для публикации. Я продолжаю исходный пост

Я обновился до последней сборки Java 7 1.7.0_60-b19, но проблема все еще существует.

Я сделал еще один дамп потока и обнаружил ту же проблему: все DefaultMessageListenerContainers (счетчик 20) заблокированы taskScheduler (entityScheduler-3) в вызове блокировки AbstractCorrelationMessageHandler.

Вот конфигурация планировщика и агрегатора:

<task:scheduled-tasks scheduler="entityScheduler">
    <task:scheduled ref="entityReaperBean" method="run" fixed-rate="${entity.aggregator.timeout}" />
</task:scheduled-tasks>

<bean id="entityReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="entityAggregatorRedisMessageStore" />
    <property name="timeout" value="${entity.aggregator.reaper.timeout}" />
</bean>

<bean id="entityAggregatorRedisMessageStore"
    class="org.springframework.integration.redis.store.RedisMessageStore">
    <constructor-arg ref="entityRedisConnectionFactory" />
</bean>

<int:aggregator id="entityAggregator" send-partial-result-on-expiry="true"
        release-strategy-expression="size() >= ${transactions-receiver.batch_size}" correlation-strategy-expression="payload.entityCode"
        expire-groups-upon-completion="true" message-store="entityAggregatorRedisMessageStore" />

Here's the thread dump section:

"entityScheduler-3" - Thread t@188
 java.lang.Thread.State: WAITING
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <545079d6> (a    java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
 - locked <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

"DefaultMessageListenerContainer-5" - Thread t@48
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- waiting to lock <66d9180c> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) owned by "entityScheduler-3" t@188
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:894)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1221)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:223)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy16.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy17.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy15.handleMessage(Unknown Source)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
at sun.reflect.GeneratedMethodAccessor81.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.handleMessage(SimpleMessageHandlerMetrics.java:106)
at org.springframework.integration.monitor.SimpleMessageHandlerMetrics.invoke(SimpleMessageHandlerMetrics.java:86)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
at com.sun.proxy.$Proxy18.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
at org.springframework.integration.jms.SubscribableJmsChannel$DispatchingMessageListener.onMessage(SubscribableJmsChannel.java:148)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)
at java.lang.Thread.run(Thread.java:745)

 Locked ownable synchronizers:
  - None

Возможно ли, что по какой-то причине метод handleMessageInternal класса AbstractCorrelationMessageHandler не разблокируется? Я использую хранилище сообщений Redis и вижу, что MsgStoreReaper также использует блокировку. Может ли это быть проблемой?

большое спасибо за вашу помощь здесь! С уважением Гусман

ОБНОВЛЕНИЕ:

Я нашел эти сообщения в журнале:

 2014-07-23 22:47:26,967 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.
 2014-07-23 22:47:26,968 ERROR RedisMessageStore:122 - Exception in expiry callback
 2014-07-23 22:47:26,971 ERROR TaskUtils$LoggingErrorHandler:95 - Unexpected error occurred in scheduled task.

но без исключений и трассировки стека. Ошибка TaskUtils вам о чем-то говорит?


  • Забыл сказать, я использую SI v3.0.1 24.07.2014
  • Привет, ребята! Я читал документы SIv4.x и обнаружил, что был добавлен RedisLockRegistry. Насколько я вижу, этот замок может решить эту проблему. Reaper и Aggregator должны использовать этот RedisLockRegistry, чтобы избежать блокировок, я прав? 24.07.2014
  • Это странно; Я не вижу никакой возможности, чтобы замок оставался запертым; RedisLockRegistry не поможет - это при использовании нескольких экземпляров приложений и общего группового хранилища. 24.07.2014
  • Спасибо, что еще раз указали на это! Это действительно проблема в нашем коде: jira.spring.io/browse/INT-3483 24.07.2014

Ответы:


1

На самом деле, я вижу одно место...

        finally {
            if (removeGroup) {
                this.remove(group);
            }
            lock.unlock();
        }

... если хранилище сообщений выдает исключение во время удаления, мы бы пропустили разблокировку - вы видите что-нибудь в журнале?

24.07.2014
  • Я ничего не вижу, но пытаюсь обновить до SI-4.1-snapshot. Еще одно сомнение, а где SI-4.1 xsd? Не могли бы вы прислать мне URL? Благодарность! 28.07.2014
  • Мы перенесли исправление в 4.0.x и 3.0.x, поэтому оно есть в 3.0.5.BUILD-SNAPSHOT. Мы не публикуем схему, пока она не будет выпущена; вы можете найти его в банке или на github. 28.07.2014
  • excelente Гэри, большое спасибо! попробую из местной баночки 28.07.2014
  • Гэри, один вопрос, разве Жнец не создает взаимоблокировку, поскольку другие потоки ждут, пока они сработают? 28.07.2014
  • Только если хранилище сообщений выдает исключение при удалении группы. 28.07.2014
  • Привет, Гэри, я не вижу никаких исключений для конкретного приложения. Могу ли я включить определенный журнал SI, чтобы получить дополнительную информацию? может Жнец? 31.07.2014
  • Нет; если это удаление вызовет исключение, вы увидите пару журналов на уровне ERROR (я только что протестировал его с фиктивным исключением). Вы все еще видите тупиковые ситуации — даже со снимком? - мы не можем найти нигде в коде (кроме этого), где мог бы произойти тупик. 31.07.2014
  • У нас больше не было этой проблемы, но слишком рано подтверждать, что она сработала. 31.07.2014
  • Добавил новую информацию в исходный пост. Любая идея о том, почему исключение не печатается? 11.08.2014
  • Второй журнал должен включать исключение — github.com/spring-projects/spring-integration/blob/3.0.x/ 11.08.2014
  • спасибо Гэри за ответ! Странно, что не печатается исключение. Я думаю, что исправление устранило первоначальную ошибку, но теперь я пытаюсь понять, почему возникает исключение в обратном вызове с истечением срока действия. Продолжить поиск 12.08.2014
  • Новые материалы

    Интуитивное понимание тензоров в машинном обучении
    Тензор является важной концепцией во многих научных областях, таких как математика, физика, обработка сигналов и компьютерное зрение, и это лишь некоторые из них. В математике тензор — это..

    Использование машинного обучения для диагностики болезни Альцгеймера, часть 4
    Маркеры семантической согласованности для ранней диагностики болезни Альцгеймера (arXiv) Автор: Давиде Колла , Маттео Дельсанто , Марко Агосто , Бенедетто Витиелло , Даниэле Паоло Радичони..

    Почему объяснимость так важна прямо сейчас?
    По мере того, как системы искусственного интеллекта и инструменты на основе машинного обучения распространяются в нашей повседневной жизни, как практики, так и критики все чаще заявляют о..

    Анимированный математический анализ
    Использование Manim для создания математических анимированных визуализаций Визуализация данных помогает понять скрытые закономерности в данных, которые невозможно визуализировать..

    Создание простого слайдера изображений с помощью JavaScript
    Узнайте, как создать базовый слайдер изображений с помощью HTML, CSS и JavaScript. Введение В этом уроке мы создадим удобный слайдер изображений, используя JavaScript, HTML и CSS. Ползунок..

    Создание базы данных с помощью супергероя «Python»
    В этом посте мы узнаем, как создать «базу данных SQLite с помощью модуля python sqlite3, создав простую функцию входа и регистрации. Готовы ли вы к этому путешествию? Если да , давайте приступим..

    ИИ для чайников: руководство для начинающих по пониманию будущего технологий
    Вы чувствуете, что остались позади в мире ИИ? Не волнуйтесь, вы не одиноки! Со всей этой шумихой вокруг искусственного интеллекта может быть трудно понять, с чего начать. Но не позволяйте сленгу..