We noticed linear memory growth when sending a high volume of structured events
through the notification service (using a push supplier and 1 fast, non-
blocking, push consumer). After extensive debugging, I noticed the following:
Both StructuredProxyPushConsumerImpl and StructuredProxyPushSupplierImpl create
a new AbstractMessage$MessageHandle, incrementing the reference count on the
However, ONLY the StructuredProxyPushSupplierImpl is calling dispose on the
AbstractMessage$MessageHandle - (thus calling removeReference and decrementing
the reference count). Therefore, reference count on StructuredEventMessage
remains at 1, and the message is never disposed.
Stack trace of dispose by StructuredProxyPushSupplierImpl:
Thread [PushTaskExecutor#1-0] (Suspended (breakpoint at line 408 in
StructuredEventMessage(AbstractMessage).removeReference() line: 408
[local variables unavailable]
AbstractMessage$MessageHandle.dispose() line: 266
StructuredProxyPushSupplierImpl.pushEvent() line: 97
() line: 146
AbstractProxyPushSupplier.access$000(AbstractProxyPushSupplier) line: 65
AbstractProxyPushSupplier$1.doPush() line: 95
DefaultPushTaskExecutor$Worker.run() line: 62
I can't find a way to get the proxy consumer to dispose of that MessageHandle.
The closest call I found to do it would be the dispose method of
AbstractMessageTask, which is called several times (such as from
FilterProxyConsumerTask and FilterSupplierAdminTask), but each time it's called
it's AbstractMessage$MessageHandle reference is null, so it never disposes of
I can provide the code we are using to set up the supplier and consumer, but
it's pretty much a copy of your Printer demo.
Here's the configuration we are using:
#limits number of consumer proxies (i.e. suppliers)
#limits number of supplier proxies (i.e. consumers)
#number of threads for delivery to push consumers
We are putting timeouts into each of the StructuredEvents (30 minutes), and we
are setting the StartTimeSupported QOS to False on the EventChannel. That's
about it for custom configuration. The Push Consumer is not applying any
filtering to the messages, and is receiving all of them.
Update: We were able to solve the problem by adding a message.dispose() call to
the end of the org.jacorb.notification.engine.DefaultTaskFactory.enqueueMessage
This seemed like the best place since we could be sure that all consumers would
have incremented the reference count once this method was finished executing
(no danger of decrementing too early and causing the super.dispose() call that
occurs when ref count is 0).
Extensive testing with an eye on heap memory size in JConsole confirmed that
this did, indeed, allow for a full garbage collection to return heap down to
normal size with message consumption occurring at a steady rate.
This patch causes org.jacorb.test.notification.InterFilterGroupOperatorTest to fails so I'm afraid I can't merge it in.