Read only archive ; use https://github.com/JacORB/JacORB/issues for new issues

Bug 669

Summary: Notification memory leaks
Product: JacORB Reporter: liujg <liujg79>
Component: Notification ServiceAssignee: Alphonse Bendt <pho.lst>
Status: ASSIGNED ---    
Severity: normal    
Priority: P2    
Version: 2.1   
Hardware: PC   
OS: Windows 2000   

Description liujg 2006-03-08 02:09:49 CET
I write a small program using notification service.First  I run the
SimpleServer(sending notifications),then I make the PushConsumerClient
&#65288;receiving notifications) blocked(in debug mode). In this case, the
PushConsumerClient is still alive ,but does not receive notifications.
I trace the memory usage, the memory used increases untill a OutOfMemoryError
exception raised by the notification service. The exception is as following:

[08-03-2006 08:43:58:694] org.jacorb.notificat:ERROR: handleTaskError [PushToCon
sumerTask#1562]
java.lang.OutOfMemoryError: Java heap space
Exception in thread "PerProxyDeliverThread#0" java.lang.RuntimeException
        at org.jacorb.notification.engine.AbstractDeliverTask.handleTaskError(Un
known Source)
        at org.jacorb.notification.engine.AbstractTask.run(Unknown Source)
        at EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run(PooledExec
utor.java)
        at java.lang.Thread.run(Unknown Source)


I think it is strange,because the parameter
"jacorb.notification.max_events_per_consumer" is used to limit the queue size,
if the queue is full, the notifications should be discarded.


the code I used is as following:


package demo.notification.office;

import java.text.DateFormat;
import java.util.Date;

import org.omg.CORBA.Any;
import org.omg.CORBA.ORB;
import org.omg.CosNotification.EventHeader;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.FixedEventHeader;
import org.omg.CosNotification.Property;
import org.omg.CosNotification.StructuredEvent;
import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.EventChannel;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushConsumer;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushConsumerHelper;
import org.omg.CosNotifyChannelAdmin.SupplierAdmin;
import org.omg.CosNotifyComm.StructuredPushSupplierHelper;
import org.omg.CosNotifyComm.StructuredPushSupplierOperations;
import org.omg.CosNotifyComm.StructuredPushSupplierPOATie;
import org.omg.PortableServer.POA;
import demo.notification.office.PrinterPackage.AlreadyPrinted;
import demo.notification.office.PrinterPackage.OffLine;
import demo.notification.office.PrinterPackage.UnknownJobID;

//import cn.com.xinwei.nms.common.util.StopWatch;
class NotificationSender
    extends PrinterPOA
    implements StructuredPushSupplierOperations
{
  private EventChannel channel;
  private SupplierAdmin supplierAdmin;
  private StructuredProxyPushConsumer pushConsumer;
  private ORB orb;
  private POA poa;
  DateFormat format=DateFormat.getDateTimeInstance();

  private int eventId;
  //private StopWatch watch=new StopWatch();
  public int getEventId()
  {
    return eventId++;
  }

  /** Inner class PrintThread ( member class)
      simulates the actual "printing" in a separate thread
   */

  class PrintThread
      extends Thread
  {
    public PrintThread()
    {
      start();
    }

    public void run()
    {

      while (true)
      {
        int eventID = getEventId();

        // create a structured event
        StructuredEvent printedEvent = new StructuredEvent();

        // set the event type and name
        EventType type = new EventType("Demo", "notification servce test");
        FixedEventHeader fixed = new FixedEventHeader(type, "" + eventID);

        // complete header date
        Property variable[] = new Property[0];

        printedEvent.header = new EventHeader(fixed, variable);

        // set filterable event body data
        printedEvent.filterable_data = new Property[3];

        Any jobAny = orb.create_any();
        jobAny.insert_long(eventID);
        printedEvent.filterable_data[0] = new Property("job_id", jobAny);

        Any userAny = orb.create_any();
        userAny.insert_string(eventID + "");
        printedEvent.filterable_data[1] = new Property("user_id", userAny);

        Any urgentAny = orb.create_any();
        urgentAny.insert_boolean(false);
        printedEvent.filterable_data[2] = new Property("urgent", urgentAny);

        // no further even data
        printedEvent.remainder_of_body = orb.create_any();

        try
        {
         // watch.start();
          pushConsumer.push_structured_event(printedEvent);

          if(eventID%1000==0)
          {
            Date d = new Date();
            System.out.println(format.format(d)+":eventID=" + eventID);
          }
        }
        catch (org.omg.CosEventComm.Disconnected d)
        {
          d.printStackTrace();
        }

        try
        {
          Thread.sleep(10);         
        }
        catch (Exception e)
        {
          e.printStackTrace();
        }
      }
    }
  }

  public NotificationSender(EventChannel e, ORB orb, POA poa)
  {
    // set the ORb and event channel
    this.orb = orb;
    this.poa = poa;
    channel = e;
  }

  public void connect()
  {
    StructuredPushSupplierPOATie thisTie = new StructuredPushSupplierPOATie(this);

    // get admin interface and proxy consumer
    supplierAdmin = channel.default_supplier_admin();

    ClientType ctype = ClientType.STRUCTURED_EVENT;
    org.omg.CORBA.IntHolder proxyIdHolder = new org.omg.CORBA.IntHolder();

    try
    {
      pushConsumer =
          StructuredProxyPushConsumerHelper.narrow(
          supplierAdmin.obtain_notification_push_consumer(ctype, proxyIdHolder));
    }
    catch (AdminLimitExceeded ex)
    {
      System.err.println(
          "Could not get consumer proxy, maximum number of proxies exceeded!");
      System.exit(1);
    }

    // connect the push supplier
    try
    {
      pushConsumer.connect_structured_push_supplier(
          StructuredPushSupplierHelper.narrow(poa.servant_to_reference(thisTie)));
    }
    catch (Exception e)
    {
      e.printStackTrace();
    }
    // initialize "queue" and start printer thread
    new PrintThread();
  }

  /**
   * Enter a job in the printer queue
   */

  public synchronized int print(String text, String uid) throws OffLine
  {
    throw new UnsupportedOperationException();
  }

  /**
   * Remove a job in the printer queue
   */

  public void cancel(int id, String uid) throws UnknownJobID, AlreadyPrinted
  {
    throw new UnsupportedOperationException();
  }

  /**
   * Sets the printer online/offline
   */

  public void setOffLine(boolean flag)
  {
    throw new UnsupportedOperationException();
  }

  /**
   * Potentially release resources,
   * from CosNotifyComm.NotifySubscribe
   */

  public void disconnect_structured_push_supplier()
  {
    throw new UnsupportedOperationException();
  }

  /**
   * from CosNotifyComm.NotifySubscribe
   */

  public void subscription_change(EventType added[], EventType removed[])
  {
    // react somehow;
  }

}


package demo.notification.office;

import org.omg.CosNaming.NamingContextExt;
import org.omg.CosNotifyChannelAdmin.EventChannel;
import org.omg.PortableServer.POA;
import org.omg.PortableServer.POAHelper;



/**
 * A server just sending notification.
 */
public class SimpleServer
{


 static public void main(String argv[])
 {
   org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(argv, null);
   try
   {
     // initialize POA, get naming and event service references
     POA poa = POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
     poa.the_POAManager().activate();

     EventChannel channel = NoticationServiceUtil.createEventChannel(orb);
     NamingContextExt nc = NoticationServiceUtil.getNamingContextExt(orb);
     nc.rebind(nc.to_name("office_event.channel"), channel);

     // create a Printer object, implicitly activate it and advertise its presence
     NotificationSender printer = new NotificationSender(channel, orb, poa);
     printer.connect();
     System.out.println("Printer created and connected");
     // wait for requests
     orb.run();
   }
   catch (Exception ex)
   {
     ex.printStackTrace();
   }
 }

}

package demo.notification.office;

import java.text.DateFormat;
import java.util.Date;

import org.omg.CORBA.IntHolder;
import org.omg.CosEventComm.Disconnected;
import org.omg.CosNaming.NamingContextExt;
import org.omg.CosNaming.NamingContextExtHelper;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.FixedEventHeader;
import org.omg.CosNotification.Property;
import org.omg.CosNotification.StructuredEvent;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.EventChannel;
import org.omg.CosNotifyChannelAdmin.EventChannelHelper;
import org.omg.CosNotifyChannelAdmin.InterFilterGroupOperator;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.StructuredPushConsumer;
import org.omg.CosNotifyComm.StructuredPushConsumerPOA;
import org.omg.CosNotifyFilter.Filter;
import org.omg.CosNotifyFilter.FilterFactory;
import org.omg.PortableServer.POA;
import org.omg.PortableServer.POAHelper;

public class PushConsumerClient
    extends StructuredPushConsumerPOA

{
  private static String channelName = "office_event.channel";
  private long count = 0;
  DateFormat format = DateFormat.getDateTimeInstance();
  public PushConsumerClient()
  {
  }

  public void push_structured_event(StructuredEvent structuredEvent) throws
      Disconnected
  {
    count++;
    if (count % 100 == 0)
    {
      Date d = new Date();
      System.out.println(format.format(d));
      printEvent(structuredEvent);
    }
  }

  public void disconnect_structured_push_consumer()
  {
    // proxyPushSupplier.disconnect_structured_push_supplier();
    // orb.shutdown(true);
  }

  public void offer_change(EventType[] eventTypeArray,
                           EventType[] eventTypeArray1) throws InvalidEventType
  {

  }

  public static void main(String[] args)
  {

    EventChannel channel = null;
    ConsumerAdmin consumerAdmin;
    StructuredProxyPushSupplier proxyPushSupplier = null;

    //initORB
    org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(args, null);
    POA poa = null;

    try
    {
      //initPOA
      poa =
          POAHelper.narrow(orb.resolve_initial_references("RootPOA"));

      //
      NamingContextExt nc =
          NamingContextExtHelper.narrow(orb.resolve_initial_references(
          "NameService"));

      if (nc != null)
        System.out.println("get naming ContextEXT");
        //
      channel =
          EventChannelHelper.narrow(nc.resolve_str(channelName));

      poa.the_POAManager().activate();

      // create and implicitly activate the client
      StructuredPushConsumer structuredPushConsumer =
          (StructuredPushConsumer)new PushConsumerClient()._this(orb);

      //
      // get the admin interface and the supplier proxy
      IntHolder adminID = new IntHolder();

      consumerAdmin = channel.new_for_consumers(InterFilterGroupOperator.AND_OP,
                                                adminID);

      EventType added[] = new EventType[1];
      EventType removed[] = new EventType[0];
      added[0] = new EventType("TEL", "STAT");
      try
      {
        consumerAdmin.subscription_change(added, removed);
      }
      catch (Exception e)
      {
        System.err.println("ERROR: " + e);
        e.printStackTrace(System.err);
      }

      //consumerAdmin.subscription_change(new EventType[0],);

      proxyPushSupplier =
          StructuredProxyPushSupplierHelper.narrow(
          consumerAdmin.obtain_notification_push_supplier(
          ClientType.STRUCTURED_EVENT,
          new org.omg.CORBA.IntHolder()));

      proxyPushSupplier.subscription_change(added, removed);
      //2&#12289;connect ourselves to the event channel
      proxyPushSupplier.connect_structured_push_consumer(structuredPushConsumer);

      // get the default filter factory
      FilterFactory filterFactory = null;
      Filter filter = null;
      filterFactory = channel.default_filter_factory();
      if (filterFactory == null)
      {
        System.err.println("No default filter Factory!");
      }
      else
      {
        filter = filterFactory.create_filter("EXTENDED_TCL");
        EventType[] eventTypes = new EventType[]
            {
            new EventType("TEL", "EMS"),
            new EventType("TEL", "STAT"),
        };
      }

    }
    catch (Exception e)
    {
      e.printStackTrace();
      System.exit(1);
    }

    orb.run();

  }

  //-----------------------private methods----------------------------
  private void printEvent(StructuredEvent event)
  {

    System.out.println("got structured event.");
    FixedEventHeader fixed_header = event.header.fixed_header;
    System.out.println("=======================================");
    System.out.println("\t" + fixed_header.event_type.domain_name + "." +
                       fixed_header.event_type.type_name + "#" +
                       fixed_header.event_name);

    Property properties[] = event.filterable_data;
    System.out.println("\t" + properties[0].name +
                       " : " + properties[0].value.extract_long());
    System.out.println("\t" + properties[1].name +
                       " : " + properties[1].value.extract_string());
    System.out.println("=======================================");
  }

}


[08-03-2006 08:43:58:694] org.jacorb.notificat:ERROR: handleTaskError [PushToCon
sumerTask#1562]
java.lang.OutOfMemoryError: Java heap space
Exception in thread "PerProxyDeliverThread#0" java.lang.RuntimeException
        at org.jacorb.notification.engine.AbstractDeliverTask.handleTaskError(Un
known Source)
        at org.jacorb.notification.engine.AbstractTask.run(Unknown Source)
        at EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run(PooledExec
utor.java)
        at java.lang.Thread.run(Unknown Source)
Comment 1 Alphonse Bendt 2006-03-08 08:27:12 CET
did you try to use JacORB 2.2.2 and/or latest CVS snapshot?
Comment 2 liujg 2006-03-09 03:16:46 CET
I read the source code of class StructuredProxyPushSupplierImpl:

public void deliverMessage( Message event ) throws Disconnected
{
   if (logger_.isDebugEnabled())
   {
        logger_.debug( "deliverEvent connected="
                        + isConnected()
                        + " active="
                        + active_
                        + " enabled="
                        + enabled_ );
   }

  if ( isConnected() ) {

    if ( active_ && enabled_ )
         {
                  pushConsumer_.push_structured_event( event.toStructuredEvent() );
                  event.dispose();
                }
   else
   {
                    // not enabled
         enqueue( event );
   }
 }
  else
        {
            logger_.debug( "Not connected" );
        }
    }


if and only if "active_ && enabled_", the follow events will be queued. But when
the consumer is blocked , "pushConsumer_.push_structured_event(
event.toStructuredEvent() );" still execute, in this case ,how will the
dilivered event be processed? if it is cached, can it  lead to memory usage
increase?
Comment 3 Alphonse Bendt 2006-03-09 23:19:55 CET
> if and only if "active_ && enabled_", the follow events will be queued. 

no if active_ && enabled_ events are pushed to the consumer.

> But when
> the consumer is blocked , "pushConsumer_.push_structured_event(
> event.toStructuredEvent() );" still execute, 

i am not sure about what you mean with blocked

please check out the latest CVS as the code has changed quite a bit
Comment 4 liujg 2006-03-17 02:02:08 CET
 Mr. Alphonse Bendt:
    Thank you for you advice. I try the latest JacORB (2.2.3),but the problem is
not resolved yet.I sent you a email to you the day before yesterday,have you
received it? I  test notification service as follow:


(1)I start a provider in a dos window.The provider sends 1000 notifications per
second.
(2)I start a consumer in another dos window,It prints the received
notifications. I click the left button of the mouse to make the  dos window
pause(This is what I mean with "blocked"), in this case, the consumer stop
receiving notifications.

After a long time , the memory of notification service is used up, a
OutOfMemoryException is raised.



I think when the consumer stop receiving
notifications(But still connect with the channel),the
notification service should discard the notifications
or   just  cache some of them. But it seems that
notification service cache notifications until all
memory is used up.



Comment 5 liujg 2006-03-21 02:12:21 CET
If  the provider sends notifications faster then consumer,how does the
notification service process the notifications? cache or discard?